The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
trunk.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: db622a4ce270e235fe83c1e318efcd5102d54cb1 $
19 *
20 * @file src/lib/server/trunk.c
21 * @brief A management API for bonding multiple connections together.
22 *
23 * @copyright 2019-2020 Arran Cudbard-Bell (a.cudbardb@freeradius.org)
24 * @copyright 2019-2020 The FreeRADIUS server project
25 */
26
27#define LOG_PREFIX trunk->log_prefix
28
29#ifdef NDEBUG
30# define TALLOC_GET_TYPE_ABORT_NOOP 1
31#endif
32
35typedef struct trunk_s trunk_t;
36#define _TRUNK_PRIVATE 1
37#include <freeradius-devel/server/trunk.h>
38
39#include <freeradius-devel/server/trigger.h>
40#include <freeradius-devel/util/debug.h>
41#include <freeradius-devel/util/misc.h>
42#include <freeradius-devel/util/syserror.h>
43#include <freeradius-devel/util/minmax_heap.h>
44
45#ifdef HAVE_STDATOMIC_H
46# include <stdatomic.h>
47# ifndef ATOMIC_VAR_INIT
48# define ATOMIC_VAR_INIT(_x) (_x)
49# endif
50#else
51# include <freeradius-devel/util/stdatomic.h>
52#endif
53
54static atomic_uint_fast64_t request_counter = ATOMIC_VAR_INIT(1);
55
56#ifdef TESTING_TRUNK
58
59static fr_time_t test_time(void)
60{
61 return test_time_base;
62}
63
64#define fr_time test_time
65#endif
66
67#ifndef NDEBUG
68/** The maximum number of state logs to record per request
69 *
70 */
71#define TRUNK_REQUEST_STATE_LOG_MAX 20
72
73/** Trace state machine changes for a particular request
74 *
75 */
76typedef struct {
77 fr_dlist_head_t *log_head; //!< To allow the log entry to remove itself on free.
78 fr_dlist_t entry; //!< Entry in the linked list.
79 trunk_request_state_t from; //!< What state we transitioned from.
80 trunk_request_state_t to; //!< What state we transitioned to.
81
82 trunk_connection_t *tconn; //!< The request was associated with.
83 ///< Pointer may now be invalid, do no de-reference.
84
85 uint64_t tconn_id; //!< If the treq was associated with a connection
86 ///< the connection ID.
87 trunk_connection_state_t tconn_state; //!< If the treq was associated with a connection
88 ///< the connection state at the time of the
89 ///< state transition.
90
91 char const *function; //!< State change occurred in.
92 int line; //!< Line change occurred on.
94#endif
95
96/** Wraps a normal request
97 *
98 */
100 struct trunk_request_pub_s pub; //!< Public fields in the trunk request.
101 ///< This *MUST* be the first field in this
102 ///< structure.
103
104 uint64_t id; //!< Trunk request ID.
105
106 fr_heap_index_t heap_id; //!< Used to track the request conn->pending heap.
107
108 fr_dlist_t entry; //!< Used to track the trunk request in the conn->sent
109 ///< or trunk->backlog request.
110
111 trunk_cancel_reason_t cancel_reason; //!< Why this request was cancelled.
112
113 fr_time_t last_freed; //!< Last time this request was freed.
114
115 bool bound_to_conn; //!< Fail the request if there's an attempt to
116 ///< re-enqueue it.
117
118 bool sent; //!< Trunk request has been sent at least once.
119 ///< Used so that re-queueing doesn't increase trunk
120 ///< `sent` count.
121
122#ifndef NDEBUG
123 fr_dlist_head_t log; //!< State change log.
124#endif
125};
126
127
128/** Associates request queues with a connection
129 *
130 * @dotfile src/lib/server/trunk_conn.gv "Trunk connection state machine"
131 * @dotfile src/lib/server/trunk_req.gv "Trunk request state machine"
132 */
134 struct trunk_connection_pub_s pub; //!< Public fields in the trunk connection.
135 ///< This *MUST* be the first field in this
136 ///< structure.
137
138 fr_heap_index_t heap_id; //!< Used to track the connection in the connected
139 ///< heap.
140
141 fr_dlist_t entry; //!< Used to track the connection in the connecting,
142 ///< full and failed lists.
143
144 /** @name State
145 * @{
146 */
147 trunk_connection_event_t events; //!< The current events we expect to be notified on.
148 /** @} */
149
150 /** @name Request lists
151 * @{
152 */
153 fr_heap_t *pending; //!< Requests waiting to be sent.
154
155 trunk_request_t *partial; //!< Partially written request.
156
157 fr_dlist_head_t sent; //!< Sent request.
158
159 fr_dlist_head_t reapable; //!< Idle request.
160
161 fr_dlist_head_t cancel; //!< Requests in the cancel state.
162
163 trunk_request_t *cancel_partial; //!< Partially written cancellation request.
164
165 fr_dlist_head_t cancel_sent; //!< Sent cancellation request.
166 /** @} */
167
168 /** @name Statistics
169 * @{
170 */
171 uint64_t sent_count; //!< The number of requests that have been sent using
172 ///< this connection.
173 /** @} */
174
175 /** @name Timers
176 * @{
177 */
178 fr_timer_t *lifetime_ev; //!< Maximum time this connection can be open.
179 /** @} */
180};
181
182/** An entry in a trunk watch function list
183 *
184 */
185typedef struct trunk_watch_entry_s {
186 fr_dlist_t entry; //!< List entry.
187 trunk_watch_t func; //!< Function to call when a trunk enters
188 ///< the state this list belongs to
189 bool oneshot; //!< Remove the function after it's called once.
190 bool enabled; //!< Whether the watch entry is enabled.
191 void *uctx; //!< User data to pass to the function.
193
194/** Map connection states to trigger names
195 *
196 * Must stay in the same order as #trunk_connection_state_t
197 */
199 { L("pool.connection_halted"), TRUNK_CONN_HALTED }, /* 0x0000 - bit 0 */
200 { L("pool.connection_init"), TRUNK_CONN_INIT }, /* 0x0001 - bit 1 */
201 { L("pool.connection_connecting"), TRUNK_CONN_CONNECTING }, /* 0x0002 - bit 2 */
202 { L("pool.connection_active"), TRUNK_CONN_ACTIVE }, /* 0x0004 - bit 3 */
203 { L("pool.connection_closed"), TRUNK_CONN_CLOSED }, /* 0x0008 - bit 4 */
204 { L("pool.connection_full"), TRUNK_CONN_FULL }, /* 0x0010 - bit 5 */
205 { L("pool.connection_inactive"), TRUNK_CONN_INACTIVE }, /* 0x0020 - bit 6 */
206 { L("pool.connection_inactive_draining"), TRUNK_CONN_INACTIVE_DRAINING }, /* 0x0040 - bit 7 */
207 { L("pool.connection_draining"), TRUNK_CONN_DRAINING }, /* 0x0080 - bit 8 */
208 { L("pool.connection_draining_to_free"), TRUNK_CONN_DRAINING_TO_FREE } /* 0x0100 - bit 9 */
209};
211
212/** Main trunk management handle
213 *
214 */
215struct trunk_s {
216 struct trunk_pub_s pub; //!< Public fields in the trunk connection.
217 ///< This *MUST* be the first field in this
218 ///< structure.
219
220 char const *log_prefix; //!< What to prepend to messages.
221
222 fr_event_list_t *el; //!< Event list used by this trunk and the connection.
223
224 trunk_conf_t conf; //!< Trunk common configuration.
225
226 fr_dlist_head_t free_requests; //!< Requests in the unassigned state. Waiting to be
227 ///< enqueued.
228
229 fr_heap_t *backlog; //!< The request backlog. Requests we couldn't
230 ///< immediately assign to a connection.
231
232 /** @name Connection lists
233 *
234 * A connection must always be in exactly one of these lists
235 * or trees.
236 *
237 * @{
238 */
239 fr_dlist_head_t init; //!< Connections which have not yet started
240 ///< connecting.
241
242 fr_dlist_head_t connecting; //!< Connections which are not yet in the open state.
243
244 fr_minmax_heap_t *active; //!< Connections which can service requests.
245
246 fr_dlist_head_t full; //!< Connections which have too many outstanding
247 ///< requests.
248
249 fr_dlist_head_t inactive; //!< Connections which have been signalled to be
250 ///< inactive by the API client.
251
252 fr_dlist_head_t inactive_draining; //!< Connections which have been signalled to be
253 ///< inactive by the API client, which the trunk
254 ///< manager is draining to close.
255
256 fr_dlist_head_t failed; //!< Connections that'll be reconnected shortly.
257
258 fr_dlist_head_t closed; //!< Connections that have closed. Either due to
259 ///< shutdown, reconnection or failure.
260
261 fr_dlist_head_t draining; //!< Connections that will be freed once all their
262 ///< requests are complete, but can be reactivated.
263
264 fr_dlist_head_t draining_to_free; //!< Connections that will be freed once all their
265 ///< requests are complete.
266
267 fr_dlist_head_t to_free; //!< Connections we're done with and will free on
268 //!< the next call to trunk_manage.
269 //!< This prevents connections from being freed
270 //!< whilst we're inside callbacks.
271 /** @} */
272
273 /** @name Callbacks
274 * @{
275 */
276 trunk_io_funcs_t funcs; //!< I/O functions.
277
278 void *in_handler; //!< Which handler we're inside.
279
280 void *uctx; //!< Uctx data to pass to alloc.
281
282 fr_dlist_head_t watch[TRUNK_STATE_MAX]; //!< To be called when trunk changes state.
283
284 trunk_watch_entry_t *next_watcher; //!< Watcher about to be run. Used to prevent nested watchers.
285 /** @} */
286
287 /** @name Timers
288 * @{
289 */
290 fr_timer_t *manage_ev; //!< Periodic connection management event.
291 /** @} */
292
293 /** @name Log rate limiting entries
294 * @{
295 */
296 fr_rate_limit_t limit_max_requests_alloc_log; //!< Rate limit on "Refusing to alloc requests - Limit of * requests reached"
297
298 fr_rate_limit_t limit_last_failure_log; //!< Rate limit on "Refusing to enqueue requests - No active conns"
299 /** @} */
300
301 /** @name State
302 * @{
303 */
304 bool freeing; //!< Trunk is being freed, don't spawn new
305 ///< connections or re-enqueue.
306
307 bool started; //!< Has the trunk been started.
308
309 bool managing_connections; //!< Whether the trunk is allowed to manage
310 ///< (open/close) connections.
311
312 uint64_t last_req_per_conn; //!< The last request to connection ratio we calculated.
313 /** @} */
314
315 fr_pair_list_t *trigger_args; //!< Passed to trigger
316
317 bool trigger_undef[NUM_ELEMENTS(trunk_conn_trigger_names)]; //!< Record that a specific trigger is undefined.
318
320};
321
322int trunk_trigger_cf_parse(TALLOC_CTX *ctx, void *out, void *parent, CONF_ITEM *ci, conf_parser_t const *rule);
323
325 { FR_CONF_OFFSET("per_connection_max", trunk_conf_t, max_req_per_conn), .dflt = "2000" },
326 { FR_CONF_OFFSET("per_connection_target", trunk_conf_t, target_req_per_conn), .dflt = "1000" },
327 { FR_CONF_OFFSET("free_delay", trunk_conf_t, req_cleanup_delay), .dflt = "10.0" },
328 { FR_CONF_OFFSET("triggers", trunk_conf_t, req_triggers), .func = trunk_trigger_cf_parse },
329
331};
332
334 { FR_CONF_OFFSET("connect_timeout", connection_conf_t, connection_timeout), .dflt = "3.0" },
335 { FR_CONF_OFFSET("reconnect_delay", connection_conf_t, reconnection_delay), .dflt = "1" },
336
338};
339
340#ifndef TRUNK_TESTS
342 { FR_CONF_OFFSET("start", trunk_conf_t, start), .dflt = "1" },
343 { FR_CONF_OFFSET("min", trunk_conf_t, min), .dflt = "1" },
344 { FR_CONF_OFFSET("max", trunk_conf_t, max), .dflt = "5" },
345 { FR_CONF_OFFSET("connecting", trunk_conf_t, connecting), .dflt = "2" },
346 { FR_CONF_OFFSET("uses", trunk_conf_t, max_uses), .dflt = "0" },
347 { FR_CONF_OFFSET("lifetime", trunk_conf_t, lifetime), .dflt = "0" },
348 { FR_CONF_OFFSET("idle_timeout", trunk_conf_t, idle_timeout), .dflt = "0" },
349
350 { FR_CONF_OFFSET("open_delay", trunk_conf_t, open_delay), .dflt = "0.2" },
351 { FR_CONF_OFFSET("close_delay", trunk_conf_t, close_delay), .dflt = "10.0" },
352
353 { FR_CONF_OFFSET("manage_interval", trunk_conf_t, manage_interval), .dflt = "0.2" },
354
355 { FR_CONF_OFFSET("max_backlog", trunk_conf_t, max_backlog), .dflt = "1000" },
356
357 { FR_CONF_OFFSET("triggers", trunk_conf_t, conn_triggers), .func = trunk_trigger_cf_parse },
358
359 { FR_CONF_OFFSET_SUBSECTION("connection", 0, trunk_conf_t, conn_conf, trunk_config_connection), .subcs_size = sizeof(trunk_config_connection) },
360 { FR_CONF_POINTER("request", 0, CONF_FLAG_SUBSECTION, NULL), .subcs = (void const *) trunk_config_request },
361
363};
364#endif
365
366#ifndef NDEBUG
367/** Map request states to trigger names
368 *
369 * Must stay in the same order as #trunk_connection_state_t
370 */
372 { L("pool.request_init"), TRUNK_REQUEST_STATE_INIT }, /* 0x0000 - bit 0 */
373 { L("pool.request_unassigned"), TRUNK_REQUEST_STATE_UNASSIGNED }, /* 0x0001 - bit 1 */
374 { L("pool.request_backlog"), TRUNK_REQUEST_STATE_BACKLOG }, /* 0x0002 - bit 2 */
375 { L("pool.request_pending"), TRUNK_REQUEST_STATE_PENDING }, /* 0x0004 - bit 3 */
376 { L("pool.request_partial"), TRUNK_REQUEST_STATE_PARTIAL }, /* 0x0008 - bit 4 */
377 { L("pool.request_sent"), TRUNK_REQUEST_STATE_SENT }, /* 0x0010 - bit 5 */
378 { L("pool.request_state_reapable"), TRUNK_REQUEST_STATE_REAPABLE }, /* 0x0020 - bit 6 */
379 { L("pool.request_complete"), TRUNK_REQUEST_STATE_COMPLETE }, /* 0x0040 - bit 7 */
380 { L("pool.request_state_failed"), TRUNK_REQUEST_STATE_FAILED }, /* 0x0080 - bit 8 */
381 { L("pool.request_state_cancel"), TRUNK_REQUEST_STATE_CANCEL }, /* 0x0100 - bit 9 */
382 { L("pool.request_state_cancel_sent"), TRUNK_REQUEST_STATE_CANCEL_SENT }, /* 0x0200 - bit 10 */
383 { L("pool.request_state_cancel_partial"), TRUNK_REQUEST_STATE_CANCEL_PARTIAL }, /* 0x0400 - bit 11 */
384 { L("pool.request_state_cancel_complete"), TRUNK_REQUEST_STATE_CANCEL_COMPLETE }, /* 0x0800 - bit 12 */
385};
387#endif
388
390 { L("INIT"), TRUNK_REQUEST_STATE_INIT },
391 { L("UNASSIGNED"), TRUNK_REQUEST_STATE_UNASSIGNED },
392 { L("BACKLOG"), TRUNK_REQUEST_STATE_BACKLOG },
393 { L("PENDING"), TRUNK_REQUEST_STATE_PENDING },
394 { L("PARTIAL"), TRUNK_REQUEST_STATE_PARTIAL },
395 { L("SENT"), TRUNK_REQUEST_STATE_SENT },
396 { L("REAPABLE"), TRUNK_REQUEST_STATE_REAPABLE },
397 { L("COMPLETE"), TRUNK_REQUEST_STATE_COMPLETE },
398 { L("FAILED"), TRUNK_REQUEST_STATE_FAILED },
399 { L("CANCEL"), TRUNK_REQUEST_STATE_CANCEL },
400 { L("CANCEL-SENT"), TRUNK_REQUEST_STATE_CANCEL_SENT },
401 { L("CANCEL-PARTIAL"), TRUNK_REQUEST_STATE_CANCEL_PARTIAL },
402 { L("CANCEL-COMPLETE"), TRUNK_REQUEST_STATE_CANCEL_COMPLETE }
403};
405
407 { L("IDLE"), TRUNK_STATE_IDLE },
408 { L("ACTIVE"), TRUNK_STATE_ACTIVE },
409 { L("PENDING"), TRUNK_STATE_PENDING }
410};
412
414 { L("INIT"), TRUNK_CONN_INIT },
415 { L("HALTED"), TRUNK_CONN_HALTED },
416 { L("CONNECTING"), TRUNK_CONN_CONNECTING },
417 { L("ACTIVE"), TRUNK_CONN_ACTIVE },
418 { L("CLOSED"), TRUNK_CONN_CLOSED },
419 { L("FULL"), TRUNK_CONN_FULL },
420 { L("INACTIVE"), TRUNK_CONN_INACTIVE },
421 { L("INACTIVE-DRAINING"), TRUNK_CONN_INACTIVE_DRAINING },
422 { L("DRAINING"), TRUNK_CONN_DRAINING },
423 { L("DRAINING-TO-FREE"), TRUNK_CONN_DRAINING_TO_FREE }
424};
426
428 { L("TRUNK_CANCEL_REASON_NONE"), TRUNK_CANCEL_REASON_NONE },
429 { L("TRUNK_CANCEL_REASON_SIGNAL"), TRUNK_CANCEL_REASON_SIGNAL },
430 { L("TRUNK_CANCEL_REASON_MOVE"), TRUNK_CANCEL_REASON_MOVE },
431 { L("TRUNK_CANCEL_REASON_REQUEUE"), TRUNK_CANCEL_REASON_REQUEUE }
432};
434
436 { L("TRUNK_CONN_EVENT_NONE"), TRUNK_CONN_EVENT_NONE },
437 { L("TRUNK_CONN_EVENT_READ"), TRUNK_CONN_EVENT_READ },
438 { L("TRUNK_CONN_EVENT_WRITE"), TRUNK_CONN_EVENT_WRITE },
439 { L("TRUNK_CONN_EVENT_BOTH"), TRUNK_CONN_EVENT_BOTH },
440};
442
443#define CONN_TRIGGER(_state) do { \
444 uint8_t idx = fr_high_bit_pos(_state); \
445 if (trunk->conf.conn_triggers && !trunk->trigger_undef[idx]) { \
446 if (trigger(unlang_interpret_get_thread_default(), trunk->conf.conn_trigger_cs, \
447 &trunk->trigger_cp[idx], \
448 fr_table_str_by_value(trunk_conn_trigger_names, _state, \
449 "<INVALID>"), true, trunk->trigger_args) == -1) { \
450 trunk->trigger_undef[idx] = true; \
451 } \
452 } \
453} while (0)
454
455#define CONN_STATE_TRANSITION(_new, _log) \
456do { \
457 _log("[%" PRIu64 "] Trunk connection changed state %s -> %s", \
458 tconn->pub.conn->id, \
459 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
460 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>")); \
461 tconn->pub.state = _new; \
462 CONN_TRIGGER(_new); \
463 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false); \
464} while (0)
465
466#define CONN_BAD_STATE_TRANSITION(_new) \
467do { \
468 if (!fr_cond_assert_msg(0, "[%" PRIu64 "] Trunk connection invalid transition %s -> %s", \
469 tconn->pub.conn->id, \
470 fr_table_str_by_value(trunk_connection_states, tconn->pub.state, "<INVALID>"), \
471 fr_table_str_by_value(trunk_connection_states, _new, "<INVALID>"))) return; \
472} while (0)
473
474#ifndef NDEBUG
475void trunk_request_state_log_entry_add(char const *function, int line,
476 trunk_request_t *treq, trunk_request_state_t new) CC_HINT(nonnull);
477
478#define REQUEST_TRIGGER(_state) do { \
479 if (trunk->conf.req_triggers) { \
480 trigger(unlang_interpret_get_thread_default(), \
481 trunk->conf.req_trigger_cs, NULL, fr_table_str_by_value(trunk_req_trigger_names, _state, \
482 "<INVALID>"), true, trunk->trigger_args); \
483 } \
484} while (0)
485
486/** Record a request state transition and log appropriate output
487 *
488 */
489#define REQUEST_STATE_TRANSITION(_new) \
490do { \
491 request_t *request = treq->pub.request; \
492 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
493 treq->id, \
494 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
495 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
496 trunk_request_state_log_entry_add(__FUNCTION__, __LINE__, treq, _new); \
497 treq->pub.state = _new; \
498 REQUEST_TRIGGER(_new); \
499} while (0)
500#define REQUEST_BAD_STATE_TRANSITION(_new) \
501do { \
502 trunk_request_state_log(&default_log, L_ERR, __FILE__, __LINE__, treq); \
503 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
504 treq->id, \
505 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
506 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
507} while (0)
508#else
509/** Record a request state transition
510 *
511 */
512#define REQUEST_STATE_TRANSITION(_new) \
513do { \
514 request_t *request = treq->pub.request; \
515 ROPTIONAL(RDEBUG3, DEBUG3, "Trunk request %" PRIu64 " changed state %s -> %s", \
516 treq->id, \
517 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
518 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>")); \
519 treq->pub.state = _new; \
520} while (0)
521#define REQUEST_BAD_STATE_TRANSITION(_new) \
522do { \
523 if (!fr_cond_assert_msg(0, "Trunk request %" PRIu64 " invalid transition %s -> %s", \
524 treq->id, \
525 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"), \
526 fr_table_str_by_value(trunk_request_states, _new, "<INVALID>"))) return; \
527} while (0)
528#endif
529
530
531/** Call the cancel callback if set
532 *
533 */
534#define DO_REQUEST_CANCEL(_treq, _reason) \
535do { \
536 if ((_treq)->pub.trunk->funcs.request_cancel) { \
537 request_t *request = (_treq)->pub.request; \
538 void *_prev = (_treq)->pub.trunk->in_handler; \
539 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_cancel; \
540 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_cancel(conn=%p, preq=%p, reason=%s, uctx=%p)", \
541 (_treq)->pub.tconn->pub.conn, \
542 (_treq)->pub.preq, \
543 fr_table_str_by_value(trunk_cancellation_reasons, \
544 (_reason), \
545 "<INVALID>"), \
546 (_treq)->pub.trunk->uctx); \
547 (_treq)->pub.trunk->funcs.request_cancel((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_reason), (_treq)->pub.trunk->uctx); \
548 (_treq)->pub.trunk->in_handler = _prev; \
549 } \
550} while(0)
551
552/** Call the "conn_release" callback (if set)
553 *
554 */
555#define DO_REQUEST_CONN_RELEASE(_treq) \
556do { \
557 if ((_treq)->pub.trunk->funcs.request_conn_release) { \
558 request_t *request = (_treq)->pub.request; \
559 void *_prev = (_treq)->pub.trunk->in_handler; \
560 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_conn_release; \
561 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_conn_release(conn=%p, preq=%p, uctx=%p)", \
562 (_treq)->pub.tconn->pub.conn, \
563 (_treq)->pub.preq, \
564 (_treq)->pub.trunk->uctx); \
565 (_treq)->pub.trunk->funcs.request_conn_release((_treq)->pub.tconn->pub.conn, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
566 (_treq)->pub.trunk->in_handler = _prev; \
567 } \
568} while(0)
569
570/** Call the complete callback (if set)
571 *
572 */
573#define DO_REQUEST_COMPLETE(_treq) \
574do { \
575 if ((_treq)->pub.trunk->funcs.request_complete) { \
576 request_t *request = (_treq)->pub.request; \
577 void *_prev = (_treq)->pub.trunk->in_handler; \
578 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_complete(request=%p, preq=%p, rctx=%p, uctx=%p)", \
579 (_treq)->pub.request, \
580 (_treq)->pub.preq, \
581 (_treq)->pub.rctx, \
582 (_treq)->pub.trunk->uctx); \
583 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_complete; \
584 (_treq)->pub.trunk->funcs.request_complete((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, (_treq)->pub.trunk->uctx); \
585 (_treq)->pub.trunk->in_handler = _prev; \
586 } \
587} while(0)
588
589/** Call the fail callback (if set)
590 *
591 */
592#define DO_REQUEST_FAIL(_treq, _prev_state) \
593do { \
594 if ((_treq)->pub.trunk->funcs.request_fail) { \
595 request_t *request = (_treq)->pub.request; \
596 void *_prev = (_treq)->pub.trunk->in_handler; \
597 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_fail(request=%p, preq=%p, rctx=%p, state=%s uctx=%p)", \
598 (_treq)->pub.request, \
599 (_treq)->pub.preq, \
600 (_treq)->pub.rctx, \
601 fr_table_str_by_value(trunk_request_states, (_prev_state), "<INVALID>"), \
602 (_treq)->pub.trunk->uctx); \
603 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_fail; \
604 (_treq)->pub.trunk->funcs.request_fail((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.rctx, _prev_state, (_treq)->pub.trunk->uctx); \
605 (_treq)->pub.trunk->in_handler = _prev; \
606 } \
607} while(0)
608
609/** Call the free callback (if set)
610 *
611 */
612#define DO_REQUEST_FREE(_treq) \
613do { \
614 if ((_treq)->pub.trunk->funcs.request_free) { \
615 request_t *request = (_treq)->pub.request; \
616 void *_prev = (_treq)->pub.trunk->in_handler; \
617 ROPTIONAL(RDEBUG3, DEBUG3, "Calling request_free(request=%p, preq=%p, uctx=%p)", \
618 (_treq)->pub.request, \
619 (_treq)->pub.preq, \
620 (_treq)->pub.trunk->uctx); \
621 (_treq)->pub.trunk->in_handler = (void *)(_treq)->pub.trunk->funcs.request_free; \
622 (_treq)->pub.trunk->funcs.request_free((_treq)->pub.request, (_treq)->pub.preq, (_treq)->pub.trunk->uctx); \
623 (_treq)->pub.trunk->in_handler = _prev; \
624 } \
625} while(0)
626
627/** Write one or more requests to a connection
628 *
629 */
630#define DO_REQUEST_MUX(_tconn) \
631do { \
632 void *_prev = (_tconn)->pub.trunk->in_handler; \
633 DEBUG3("[%" PRIu64 "] Calling request_mux(el=%p, tconn=%p, conn=%p, uctx=%p)", \
634 (_tconn)->pub.conn->id, \
635 (_tconn)->pub.trunk->el, \
636 (_tconn), \
637 (_tconn)->pub.conn, \
638 (_tconn)->pub.trunk->uctx); \
639 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_mux; \
640 (_tconn)->pub.trunk->funcs.request_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
641 (_tconn)->pub.trunk->in_handler = _prev; \
642} while(0)
643
644/** Read one or more requests from a connection
645 *
646 */
647#define DO_REQUEST_DEMUX(_tconn) \
648do { \
649 void *_prev = (_tconn)->pub.trunk->in_handler; \
650 DEBUG3("[%" PRIu64 "] Calling request_demux(tconn=%p, conn=%p, uctx=%p)", \
651 (_tconn)->pub.conn->id, \
652 (_tconn), \
653 (_tconn)->pub.conn, \
654 (_tconn)->pub.trunk->uctx); \
655 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_demux; \
656 (_tconn)->pub.trunk->funcs.request_demux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
657 (_tconn)->pub.trunk->in_handler = _prev; \
658} while(0)
659
660/** Write one or more cancellation requests to a connection
661 *
662 */
663#define DO_REQUEST_CANCEL_MUX(_tconn) \
664do { \
665 if ((_tconn)->pub.trunk->funcs.request_cancel_mux) { \
666 void *_prev = (_tconn)->pub.trunk->in_handler; \
667 DEBUG3("[%" PRIu64 "] Calling request_cancel_mux(tconn=%p, conn=%p, uctx=%p)", \
668 (_tconn)->pub.conn->id, \
669 (_tconn), \
670 (_tconn)->pub.conn, \
671 (_tconn)->pub.trunk->uctx); \
672 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.request_cancel_mux; \
673 (_tconn)->pub.trunk->funcs.request_cancel_mux((_tconn)->pub.trunk->el, (_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->uctx); \
674 (_tconn)->pub.trunk->in_handler = _prev; \
675 } \
676} while(0)
677
678/** Allocate a new connection
679 *
680 */
681#define DO_CONNECTION_ALLOC(_tconn) \
682do { \
683 void *_prev = trunk->in_handler; \
684 DEBUG3("Calling connection_alloc(tconn=%p, el=%p, conf=%p, log_prefix=\"%s\", uctx=%p)", \
685 (_tconn), \
686 (_tconn)->pub.trunk->el, \
687 (_tconn)->pub.trunk->conf.conn_conf, \
688 trunk->log_prefix, \
689 (_tconn)->pub.trunk->uctx); \
690 (_tconn)->pub.trunk->in_handler = (void *) (_tconn)->pub.trunk->funcs.connection_alloc; \
691 (_tconn)->pub.conn = trunk->funcs.connection_alloc((_tconn), (_tconn)->pub.trunk->el, (_tconn)->pub.trunk->conf.conn_conf, (_tconn)->pub.trunk->log_prefix, trunk->uctx); \
692 (_tconn)->pub.trunk->in_handler = _prev; \
693 if (!(_tconn)->pub.conn) { \
694 ERROR("Failed creating new connection"); \
695 talloc_free(tconn); \
696 return -1; \
697 } \
698} while(0)
699
700/** Change what events the connection should be notified about
701 *
702 */
703#define DO_CONNECTION_NOTIFY(_tconn, _events) \
704do { \
705 if ((_tconn)->pub.trunk->funcs.connection_notify) { \
706 void *_prev = (_tconn)->pub.trunk->in_handler; \
707 DEBUG3("[%" PRIu64 "] Calling connection_notify(tconn=%p, conn=%p, el=%p, events=%s, uctx=%p)", \
708 (_tconn)->pub.conn->id, \
709 (_tconn), \
710 (_tconn)->pub.conn, \
711 (_tconn)->pub.trunk->el, \
712 fr_table_str_by_value(trunk_connection_events, (_events), "<INVALID>"), \
713 (_tconn)->pub.trunk->uctx); \
714 (_tconn)->pub.trunk->in_handler = (void *)(_tconn)->pub.trunk->funcs.connection_notify; \
715 (_tconn)->pub.trunk->funcs.connection_notify((_tconn), (_tconn)->pub.conn, (_tconn)->pub.trunk->el, (_events), (_tconn)->pub.trunk->uctx); \
716 (_tconn)->pub.trunk->in_handler = _prev; \
717 } \
718} while(0)
719
720#define IN_HANDLER(_trunk) (((_trunk)->in_handler) != NULL)
721#define IN_REQUEST_MUX(_trunk) (((_trunk)->funcs.request_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_mux))
722#define IN_REQUEST_DEMUX(_trunk) (((_trunk)->funcs.request_demux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_demux))
723#define IN_REQUEST_CANCEL_MUX(_trunk) (((_trunk)->funcs.request_cancel_mux) && ((_trunk)->in_handler == (void *)(_trunk)->funcs.request_cancel_mux))
724
725#define IS_SERVICEABLE(_tconn) ((_tconn)->pub.state & TRUNK_CONN_SERVICEABLE)
726#define IS_PROCESSING(_tconn) ((tconn)->pub.state & TRUNK_CONN_PROCESSING)
727
728/** Remove the current request from the backlog
729 *
730 */
731#define REQUEST_EXTRACT_BACKLOG(_treq) \
732do { \
733 int _ret; \
734 _ret = fr_heap_extract(&(_treq)->pub.trunk->backlog, _treq); \
735 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from backlog heap: %s", fr_strerror())) break; \
736} while (0)
737
738/** Remove the current request from the pending list
739 *
740 */
741#define REQUEST_EXTRACT_PENDING(_treq) \
742do { \
743 int _ret; \
744 _ret = fr_heap_extract(&(_treq)->pub.tconn->pending, _treq); \
745 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from pending heap: %s", fr_strerror())) break; \
746} while (0)
747
748/** Remove the current request from the partial slot
749 *
750 */
751#define REQUEST_EXTRACT_PARTIAL(_treq) \
752do { \
753 fr_assert((_treq)->pub.tconn->partial == treq); \
754 tconn->partial = NULL; \
755} while (0)
756
757/** Remove the current request from the sent list
758 *
759 */
760#define REQUEST_EXTRACT_SENT(_treq) fr_dlist_remove(&tconn->sent, treq)
761
762/** Remove the current request from the reapable list
763 *
764 */
765#define REQUEST_EXTRACT_REAPABLE(_treq) fr_dlist_remove(&tconn->reapable, treq)
766
767/** Remove the current request from the cancel list
768 *
769 */
770#define REQUEST_EXTRACT_CANCEL(_treq) fr_dlist_remove(&tconn->cancel, treq)
771
772/** Remove the current request from the cancel_partial slot
773 *
774 */
775#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq) \
776do { \
777 fr_assert((_treq)->pub.tconn->cancel_partial == treq); \
778 tconn->cancel_partial = NULL; \
779} while (0)
780
781/** Remove the current request from the cancel sent list
782 *
783 */
784#define REQUEST_EXTRACT_CANCEL_SENT(_treq) fr_dlist_remove(&tconn->cancel_sent, treq)
785
786/** Reorder the connections in the active heap
787 *
788 * fr_heap_extract will also error out if heap_id is bad - no need for assert
789 */
790#define CONN_REORDER(_tconn) \
791do { \
792 int _ret; \
793 if ((fr_minmax_heap_num_elements((_tconn)->pub.trunk->active) == 1)) break; \
794 if (!fr_cond_assert((_tconn)->pub.state == TRUNK_CONN_ACTIVE)) break; \
795 _ret = fr_minmax_heap_extract((_tconn)->pub.trunk->active, (_tconn)); \
796 if (!fr_cond_assert_msg(_ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) break; \
797 fr_minmax_heap_insert((_tconn)->pub.trunk->active, (_tconn)); \
798} while (0)
799
800/** Call a list of watch functions associated with a state
801 *
802 */
804{
805 /*
806 * Nested watcher calls are not allowed
807 * and shouldn't be possible because of
808 * deferred signal processing.
809 */
810 fr_assert(trunk->next_watcher == NULL);
811
812 while ((trunk->next_watcher = fr_dlist_next(list, trunk->next_watcher))) {
813 trunk_watch_entry_t *entry = trunk->next_watcher;
814 bool oneshot = entry->oneshot; /* Watcher could be freed, so store now */
815
816 if (!entry->enabled) continue;
817 if (oneshot) trunk->next_watcher = fr_dlist_remove(list, entry);
818
819 entry->func(trunk, trunk->pub.state, state, entry->uctx);
820
821 if (oneshot) talloc_free(entry);
822 }
823 trunk->next_watcher = NULL;
824}
825
826/** Call the state change watch functions
827 *
828 */
829#define CALL_WATCHERS(_trunk, _state) \
830do { \
831 if (fr_dlist_empty(&(_trunk)->watch[_state])) break; \
832 trunk_watch_call((_trunk), &(_trunk)->watch[_state], _state); \
833} while(0)
834
835/** Remove a watch function from a trunk state list
836 *
837 * @param[in] trunk The trunk to remove the watcher from.
838 * @param[in] state to remove the watch from.
839 * @param[in] watch Function to remove.
840 * @return
841 * - 0 if the function was removed successfully.
842 * - -1 if the function wasn't present in the watch list.
843 * - -2 if an invalid state was passed.
844 */
846{
847 trunk_watch_entry_t *entry = NULL;
848 fr_dlist_head_t *list;
849
850 if (state >= TRUNK_STATE_MAX) return -2;
851
852 list = &trunk->watch[state];
853 while ((entry = fr_dlist_next(list, entry))) {
854 if (entry->func == watch) {
855 if (trunk->next_watcher == entry) {
856 trunk->next_watcher = fr_dlist_remove(list, entry);
857 } else {
858 fr_dlist_remove(list, entry);
859 }
860 talloc_free(entry);
861 return 0;
862 }
863 }
864
865 return -1;
866}
867
868/** Add a watch entry to the trunk state list
869 *
870 * @param[in] trunk The trunk to add the watcher to.
871 * @param[in] state to watch for.
872 * @param[in] watch Function to add.
873 * @param[in] oneshot Should this watcher only be run once.
874 * @param[in] uctx Context to pass to function.
875 * @return
876 * - NULL if an invalid state is passed.
877 * - A new watch entry handle on success.
878 */
880 trunk_watch_t watch, bool oneshot, void const *uctx)
881{
882 trunk_watch_entry_t *entry;
883 fr_dlist_head_t *list;
884
885 if (state >= TRUNK_STATE_MAX) return NULL;
886
887 list = &trunk->watch[state];
888 MEM(entry = talloc_zero(trunk, trunk_watch_entry_t));
889
890 entry->func = watch;
891 entry->oneshot = oneshot;
892 entry->enabled = true;
893 memcpy(&entry->uctx, &uctx, sizeof(entry->uctx));
894 fr_dlist_insert_tail(list, entry);
895
896 return entry;
897}
898
899#define TRUNK_STATE_TRANSITION(_new) \
900do { \
901 DEBUG3("Trunk changed state %s -> %s", \
902 fr_table_str_by_value(trunk_states, trunk->pub.state, "<INVALID>"), \
903 fr_table_str_by_value(trunk_states, _new, "<INVALID>")); \
904 CALL_WATCHERS(trunk, _new); \
905 trunk->pub.state = _new; \
906} while (0)
907
908static void trunk_request_enter_backlog(trunk_request_t *treq, bool new);
909static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new);
918
919static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out,
920 trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify);
921
922static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now);
923static inline void trunk_connection_auto_full(trunk_connection_t *tconn);
924static inline void trunk_connection_auto_unfull(trunk_connection_t *tconn);
925static inline void trunk_connection_readable(trunk_connection_t *tconn);
926static inline void trunk_connection_writable(trunk_connection_t *tconn);
934
935static void trunk_rebalance(trunk_t *trunk);
936static void trunk_manage(trunk_t *trunk, fr_time_t now);
937static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx);
938static void trunk_backlog_drain(trunk_t *trunk);
939
940/** Compare two protocol requests
941 *
942 * Allows protocol requests to be prioritised with a function
943 * specified by the API client. Defaults to by pointer address
944 * if no function is specified.
945 *
946 * @param[in] a treq to compare to b.
947 * @param[in] b treq to compare to a.
948 * @return
949 * - +1 if a > b.
950 * - 0 if a == b.
951 * - -1 if a < b.
952 */
953static int8_t _trunk_request_prioritise(void const *a, void const *b)
954{
957
958 fr_assert(treq_a->pub.trunk == treq_b->pub.trunk);
959
960 return treq_a->pub.trunk->funcs.request_prioritise(treq_a->pub.preq, treq_b->pub.preq);
961}
962
963/** Remove a request from all connection lists
964 *
965 * A common function used by init, fail, complete state functions to disassociate
966 * a request from a connection in preparation for freeing or reassignment.
967 *
968 * Despite its unassuming name, this function is *the* place to put calls to
969 * functions which need to be called when the number of requests associated with
970 * a connection changes.
971 *
972 * Trunk requests will always be passed to this function before they're removed
973 * from a connection, even if the requests are being freed.
974 *
975 * @param[in] treq to trigger a state change for.
976 */
978{
979 trunk_connection_t *tconn = treq->pub.tconn;
980 trunk_t *trunk = treq->pub.trunk;
981
982 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
983
984 switch (treq->pub.state) {
986 return; /* Not associated with connection */
987
990 break;
991
994 break;
995
998 break;
999
1002 break;
1003
1006 break;
1007
1010 break;
1011
1014 break;
1015
1016 default:
1017 fr_assert(0);
1018 break;
1019 }
1020
1021 /*
1022 * If the request wasn't associated with a
1023 * connection, then there's nothing more
1024 * to do.
1025 */
1026 if (!tconn) return;
1027
1028 {
1029 request_t *request = treq->pub.request;
1030
1031 ROPTIONAL(RDEBUG3, DEBUG3, "%s Trunk connection released request %" PRIu64,
1032 tconn->pub.conn->name, treq->id);
1033 }
1034 /*
1035 * Release any connection specific resources the
1036 * treq holds.
1037 */
1039
1040 switch (tconn->pub.state){
1041 case TRUNK_CONN_FULL:
1042 trunk_connection_auto_unfull(tconn); /* Check if we can switch back to active */
1043 if (tconn->pub.state == TRUNK_CONN_FULL) break; /* Only fallthrough if conn is now active */
1045
1046 case TRUNK_CONN_ACTIVE:
1047 CONN_REORDER(tconn);
1048 break;
1049
1050 default:
1051 break;
1052 }
1053
1054 treq->pub.tconn = NULL;
1055
1056 /*
1057 * Request removed from the connection
1058 * see if we need up deregister I/O events.
1059 */
1061}
1062
1063/** Transition a request to the unassigned state, in preparation for re-assignment
1064 *
1065 * @note treq->tconn may be inviable after calling
1066 * if treq->conn and connection_signals_pause are not used.
1067 * This is due to call to trunk_request_remove_from_conn.
1068 *
1069 * @param[in] treq to trigger a state change for.
1070 */
1072{
1073 trunk_t *trunk = treq->pub.trunk;
1074
1075 switch (treq->pub.state) {
1077 return;
1078
1081 break;
1082
1088 break;
1089
1090 default:
1092 }
1093
1095}
1096
1097/** Transition a request to the backlog state, adding it to the backlog of the trunk
1098 *
1099 * @note treq->tconn and treq may be inviable after calling
1100 * if treq->conn and connection_signals_pause are not used.
1101 * This is due to call to trunk_manage.
1102 *
1103 * @param[in] treq to trigger a state change for.
1104 * @param[in] new Whether this is a new request.
1105 */
1107{
1108 trunk_connection_t *tconn = treq->pub.tconn;
1109 trunk_t *trunk = treq->pub.trunk;
1110
1111 switch (treq->pub.state) {
1114 break;
1115
1118 break;
1119
1122 break;
1123
1124 default:
1126 }
1127
1129 fr_heap_insert(&trunk->backlog, treq); /* Insert into the backlog heap */
1130
1131 /*
1132 * A new request has entered the trunk.
1133 * Re-calculate request/connection ratios.
1134 */
1135 if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1136
1137 /*
1138 * To reduce latency, if there's no connections
1139 * in the connecting state, call the trunk manage
1140 * function immediately.
1141 *
1142 * Likewise, if there's draining connections
1143 * which could be moved back to active call
1144 * the trunk manage function.
1145 *
1146 * Remember requests only enter the backlog if
1147 * there's no connections which can service them.
1148 */
1152 }
1153}
1154
1155/** Transition a request to the pending state, adding it to the backlog of an active connection
1156 *
1157 * All trunk requests being added to a connection get passed to this function.
1158 * All trunk requests being removed from a connection get passed to #trunk_request_remove_from_conn.
1159 *
1160 * @note treq->tconn and treq may be inviable after calling
1161 * if treq->conn and connection_signals_pause is not used.
1162 * This is due to call to trunk_connection_event_update.
1163 *
1164 * @param[in] treq to trigger a state change for.
1165 * @param[in] tconn to enqueue the request on.
1166 * @param[in] new Whether this is a new request.
1167 */
1169{
1170 trunk_t *trunk = treq->pub.trunk;
1171
1172 fr_assert(tconn->pub.trunk == trunk);
1173 fr_assert(IS_PROCESSING(tconn));
1174
1175 switch (treq->pub.state) {
1178 fr_assert(!treq->pub.tconn);
1179 break;
1180
1182 fr_assert(!treq->pub.tconn);
1184 break;
1185
1186 case TRUNK_REQUEST_STATE_CANCEL: /* Moved from another connection */
1188 break;
1189
1190 default:
1192 }
1193
1194 /*
1195 * Assign the new connection first this first so
1196 * it appears in the state log.
1197 */
1198 treq->pub.tconn = tconn;
1199
1201
1202 {
1203 request_t *request = treq->pub.request;
1204
1205 ROPTIONAL(RDEBUG, DEBUG3, "%s Trunk connection assigned request %"PRIu64,
1206 tconn->pub.conn->name, treq->id);
1207 }
1208 fr_heap_insert(&tconn->pending, treq);
1209
1210 /*
1211 * A new request has entered the trunk.
1212 * Re-calculate request/connection ratios.
1213 */
1214 if (new) trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
1215
1216 /*
1217 * Check if we need to automatically transition the
1218 * connection to full.
1219 */
1221
1222 /*
1223 * Reorder the connection in the heap now it has an
1224 * additional request.
1225 */
1226 if (tconn->pub.state == TRUNK_CONN_ACTIVE) CONN_REORDER(tconn);
1227
1228 /*
1229 * We have a new request, see if we need to register
1230 * for I/O events.
1231 */
1233}
1234
1235/** Transition a request to the partial state, indicating that is has been partially sent
1236 *
1237 * @param[in] treq to trigger a state change for.
1238 */
1240{
1241 trunk_connection_t *tconn = treq->pub.tconn;
1242 trunk_t *trunk = treq->pub.trunk;
1243
1244 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1245
1246 switch (treq->pub.state) {
1247 case TRUNK_REQUEST_STATE_PENDING: /* All requests go through pending, even requeued ones */
1249 break;
1250
1251 default:
1253 }
1254
1255 fr_assert(!tconn->partial);
1256 tconn->partial = treq;
1257
1259}
1260
1261/** Transition a request to the sent state, indicating that it's been sent in its entirety
1262 *
1263 * @note treq->tconn and treq may be inviable after calling
1264 * if treq->conn and connection_signals_pause is not used.
1265 * This is due to call to trunk_connection_event_update.
1266 *
1267 * @param[in] treq to trigger a state change for.
1268 */
1270{
1271 trunk_connection_t *tconn = treq->pub.tconn;
1272 trunk_t *trunk = treq->pub.trunk;
1273
1274 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1275
1276 switch (treq->pub.state) {
1279 break;
1280
1283 break;
1284
1285 default:
1287 }
1288
1290 fr_dlist_insert_tail(&tconn->sent, treq);
1291
1292 /*
1293 * Update the connection's sent stats if this is the
1294 * first time this request is being sent.
1295 */
1296 if (!treq->sent) {
1297 trunk->pub.last_write_success = fr_time();
1298
1300 tconn->sent_count++;
1301 treq->sent = true;
1302
1303 /*
1304 * Enforces max_uses
1305 */
1306 if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1307 DEBUG3("Trunk hit max uses %" PRIu64 " at %d", trunk->conf.max_uses, __LINE__);
1309 }
1310 }
1311
1312 /*
1313 * We just sent a request, we probably need
1314 * to tell the event loop we want to be
1315 * notified if there's data available.
1316 */
1318}
1319
1320/** Transition a request to the reapable state, indicating that it's been sent in its entirety, but no response is expected
1321 *
1322 * @note Largely a replica of trunk_request_enter_sent.
1323 *
1324 * @param[in] treq to trigger a state change for.
1325 */
1327{
1328 trunk_connection_t *tconn = treq->pub.tconn;
1329 trunk_t *trunk = treq->pub.trunk;
1330
1331 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1332
1333 switch (treq->pub.state) {
1336 break;
1337
1340 break;
1341
1342 default:
1344 }
1345
1347 fr_dlist_insert_tail(&tconn->reapable, treq);
1348
1349 if (!treq->sent) {
1350 tconn->sent_count++;
1351 treq->sent = true;
1352
1353 if ((trunk->conf.max_uses > 0) && (tconn->sent_count >= trunk->conf.max_uses)) {
1354 DEBUG3("Trunk hit max uses %" PRIu64 " at %d", trunk->conf.max_uses, __LINE__);
1356 }
1357 }
1358
1360}
1361
1362/** Transition a request to the cancel state, placing it in a connection's cancellation list
1363 *
1364 * If a request_cancel_send callback is provided, that callback will
1365 * be called periodically for requests which were cancelled due to
1366 * a signal.
1367 *
1368 * The request_cancel_send callback will dequeue cancelled requests
1369 * and inform a remote server that the result is no longer required.
1370 *
1371 * A request must enter this state before being added to the backlog
1372 * of another connection if it's been sent or partially sent.
1373 *
1374 * @note treq->tconn and treq may be inviable after calling
1375 * if treq->conn and connection_signals_pause is not used.
1376 * This is due to call to trunk_connection_event_update.
1377 *
1378 * @param[in] treq to trigger a state change for.
1379 * @param[in] reason Why the request was cancelled.
1380 * Should be one of:
1381 * - TRUNK_CANCEL_REASON_SIGNAL request cancelled
1382 * because of a signal from the interpreter.
1383 * - TRUNK_CANCEL_REASON_MOVE request cancelled
1384 * because the connection failed and it needs
1385 * to be assigned to a new connection.
1386 * - TRUNK_CANCEL_REASON_REQUEUE request cancelled
1387 * as it needs to be resent on the same connection.
1388 */
1390{
1391 trunk_connection_t *tconn = treq->pub.tconn;
1392 trunk_t *trunk = treq->pub.trunk;
1393
1394 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1395
1396 switch (treq->pub.state) {
1399 break;
1400
1403 break;
1404
1407 break;
1408
1409 default:
1411 }
1412
1414 fr_dlist_insert_tail(&tconn->cancel, treq);
1415 treq->cancel_reason = reason;
1416
1417 DO_REQUEST_CANCEL(treq, reason);
1418
1419 /*
1420 * Our treq is no longer bound to an actual
1421 * request_t *, as we can't guarantee the
1422 * lifetime of the original request_t *.
1423 */
1424 if (treq->cancel_reason == TRUNK_CANCEL_REASON_SIGNAL) treq->pub.request = NULL;
1425
1426 /*
1427 * Register for I/O write events if we need to.
1428 */
1430}
1431
1432/** Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot
1433 *
1434 * The request_demux function is then responsible for signalling
1435 * that the cancel request is complete when the remote server
1436 * acknowledges the cancellation request.
1437 *
1438 * @param[in] treq to trigger a state change for.
1439 */
1441{
1442 trunk_connection_t *tconn = treq->pub.tconn;
1443 trunk_t *trunk = treq->pub.trunk;
1444
1445 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1448
1449 switch (treq->pub.state) {
1450 case TRUNK_REQUEST_STATE_CANCEL: /* The only valid state cancel_sent can be reached from */
1452 break;
1453
1454 default:
1456 }
1457
1459 fr_assert(!tconn->partial);
1460 tconn->cancel_partial = treq;
1461}
1462
1463/** Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list
1464 *
1465 * The request_demux function is then responsible for signalling
1466 * that the cancel request is complete when the remote server
1467 * acknowledges the cancellation request.
1468 *
1469 * @note treq->tconn and treq may be inviable after calling
1470 * if treq->conn and connection_signals_pause is not used.
1471 * This is due to call to trunk_connection_event_update.
1472 *
1473 * @param[in] treq to trigger a state change for.
1474 */
1476{
1477 trunk_connection_t *tconn = treq->pub.tconn;
1478 trunk_t *trunk = treq->pub.trunk;
1479
1480 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1483
1484 switch (treq->pub.state) {
1487 break;
1488
1491 break;
1492
1493 default:
1495 }
1496
1498 fr_dlist_insert_tail(&tconn->cancel_sent, treq);
1499
1500 /*
1501 * De-register for I/O write events
1502 * and register the read events
1503 * to drain the cancel ACKs.
1504 */
1506}
1507
1508/** Cancellation was acked, the request is complete, free it
1509 *
1510 * The API client will not be informed, as the original request_t *
1511 * will likely have been freed by this point.
1512 *
1513 * @note treq will be inviable after a call to this function.
1514 * treq->tconn may be inviable after calling
1515 * if treq->conn and connection_signals_pause is not used.
1516 * This is due to call to trunk_request_remove_from_conn.
1517 *
1518 * @param[in] treq to mark as complete.
1519 */
1521{
1522 trunk_connection_t *tconn = treq->pub.tconn;
1523 trunk_t *trunk = treq->pub.trunk;
1524
1525 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1526 if (!fr_cond_assert(!treq->pub.request)) return; /* Only a valid state for request_t * which have been cancelled */
1527
1528 switch (treq->pub.state) {
1531 break;
1532
1533 default:
1535 }
1536
1538
1540 trunk_request_free(&treq); /* Free the request */
1541}
1542
1543/** Request completed successfully, inform the API client and free the request
1544 *
1545 * @note treq will be inviable after a call to this function.
1546 * treq->tconn may also be inviable due to call to
1547 * trunk_request_remove_from_conn.
1548 *
1549 * @param[in] treq to mark as complete.
1550 */
1552{
1553 trunk_connection_t *tconn = treq->pub.tconn;
1554 trunk_t *trunk = treq->pub.trunk;
1555
1556 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1557
1558 switch (treq->pub.state) {
1563 break;
1564
1565 default:
1567 }
1568
1570 DO_REQUEST_COMPLETE(treq);
1571 trunk_request_free(&treq); /* Free the request */
1572}
1573
1574/** Request failed, inform the API client and free the request
1575 *
1576 * @note treq will be inviable after a call to this function.
1577 * treq->tconn may also be inviable due to call to
1578 * trunk_request_remove_from_conn.
1579 *
1580 * @param[in] treq to mark as failed.
1581 */
1583{
1584 trunk_connection_t *tconn = treq->pub.tconn;
1585 trunk_t *trunk = treq->pub.trunk;
1586 trunk_request_state_t prev = treq->pub.state;
1587
1588 if (!fr_cond_assert(!tconn || (tconn->pub.trunk == trunk))) return;
1589
1590 switch (treq->pub.state) {
1593 break;
1594
1595 default:
1597 break;
1598 }
1599
1601 DO_REQUEST_FAIL(treq, prev);
1602 trunk_request_free(&treq); /* Free the request */
1603}
1604
1605/** Check to see if a trunk request can be enqueued
1606 *
1607 * @param[out] tconn_out Connection the request may be enqueued on.
1608 * @param[in] trunk To enqueue requests on.
1609 * @param[in] request associated with the treq (if any).
1610 * @return
1611 * - TRUNK_ENQUEUE_OK caller should enqueue request on provided tconn.
1612 * - TRUNK_ENQUEUE_IN_BACKLOG Request should be queued in the backlog.
1613 * - TRUNK_ENQUEUE_NO_CAPACITY Unable to enqueue request as we have no spare
1614 * connections or backlog space.
1615 * - TRUNK_ENQUEUE_DST_UNAVAILABLE Can't enqueue because the destination is
1616 * unreachable.
1617 */
1619 request_t *request)
1620{
1621 trunk_connection_t *tconn;
1622 /*
1623 * If we have an active connection then
1624 * return that.
1625 */
1626 tconn = fr_minmax_heap_min_peek(trunk->active);
1627 if (tconn) {
1628 *tconn_out = tconn;
1629 return TRUNK_ENQUEUE_OK;
1630 }
1631
1632 /*
1633 * Unlike the connection pool, we don't need
1634 * to drive any internal processes by feeding
1635 * it requests.
1636 *
1637 * If the last event to occur was a failure
1638 * we refuse to enqueue new requests until
1639 * one or more connections comes online.
1640 */
1641 if (!trunk->conf.backlog_on_failed_conn &&
1642 fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
1643 fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed)) {
1645 RWARN, WARN, "Refusing to enqueue requests - "
1646 "No active connections and last event was a connection failure");
1647
1649 }
1650
1651
1652 /*
1653 * Only enforce if we're limiting maximum
1654 * number of connections, and maximum
1655 * number of requests per connection.
1656 *
1657 * The alloc function also checks this
1658 * which is why this is only done for
1659 * debug builds.
1660 */
1661 if (trunk->conf.max_req_per_conn && trunk->conf.max) {
1662 uint64_t limit;
1663
1664 limit = trunk->conf.max * (uint64_t)trunk->conf.max_req_per_conn;
1665 if (limit > 0) {
1666 uint64_t total_reqs;
1667
1668 total_reqs = trunk_request_count_by_state(trunk, TRUNK_CONN_ALL,
1670 if (total_reqs >= (limit + trunk->conf.max_backlog)) {
1672 RWARN, WARN, "Refusing to alloc requests - "
1673 "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
1674 "plus %u backlog requests reached",
1675 limit, trunk->conf.max, trunk->conf.max_req_per_conn,
1676 trunk->conf.max_backlog);
1678 }
1679 }
1680 }
1681
1683}
1684
1685/** Enqueue a request which has never been assigned to a connection or was previously cancelled
1686 *
1687 * @param[in] treq to re enqueue. Must have been removed
1688 * from its existing connection with
1689 * #trunk_connection_requests_dequeue.
1690 * @return
1691 * - TRUNK_ENQUEUE_OK Request was re-enqueued.
1692 * - TRUNK_ENQUEUE_NO_CAPACITY Request enqueueing failed because we're at capacity.
1693 * - TRUNK_ENQUEUE_DST_UNAVAILABLE Enqueuing failed for some reason.
1694 * Usually because the connection to the resource is down.
1695 */
1697{
1698 trunk_t *trunk = treq->pub.trunk;
1699 trunk_connection_t *tconn = NULL;
1700 trunk_enqueue_t ret;
1701
1702 /*
1703 * Must *NOT* still be assigned to another connection
1704 */
1705 fr_assert(!treq->pub.tconn);
1706
1707 ret = trunk_request_check_enqueue(&tconn, trunk, treq->pub.request);
1708 switch (ret) {
1709 case TRUNK_ENQUEUE_OK:
1710 if (trunk->conf.always_writable) {
1712 trunk_request_enter_pending(treq, tconn, false);
1715 } else {
1716 trunk_request_enter_pending(treq, tconn, false);
1717 }
1718 break;
1719
1721 /*
1722 * No more connections and request
1723 * is already in the backlog.
1724 *
1725 * Signal our caller it should stop
1726 * trying to drain the backlog.
1727 */
1729 trunk_request_enter_backlog(treq, false);
1730 break;
1731
1732 default:
1733 break;
1734 }
1735
1736 return ret;
1737}
1738
1739/** Shift requests in the specified states onto new connections
1740 *
1741 * This function will blindly dequeue any requests in the specified state and get
1742 * them back to the unassigned state, cancelling any sent or partially sent requests.
1743 *
1744 * This function does not check that dequeuing a request in a particular state is a
1745 * sane or sensible thing to do, that's up to the caller!
1746 *
1747 * @param[out] out A list to insert the newly dequeued and unassigned
1748 * requests into.
1749 * @param[in] tconn to dequeue requests from.
1750 * @param[in] states Dequeue request in these states.
1751 * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
1752 */
1754 int states, uint64_t max)
1755{
1756 trunk_request_t *treq;
1757 uint64_t count = 0;
1758
1759 if (max == 0) max = UINT64_MAX;
1760
1761#define OVER_MAX_CHECK if (++count > max) return (count - 1)
1762
1763#define DEQUEUE_ALL(_src_list, _state) do { \
1764 while ((treq = fr_dlist_head(_src_list))) { \
1765 OVER_MAX_CHECK; \
1766 fr_assert(treq->pub.state == (_state)); \
1767 trunk_request_enter_unassigned(treq); \
1768 fr_dlist_insert_tail(out, treq); \
1769 } } while (0)
1770
1771 /*
1772 * Don't need to do anything with
1773 * cancellation requests.
1774 */
1775 if (states & TRUNK_REQUEST_STATE_CANCEL) DEQUEUE_ALL(&tconn->cancel,
1777
1778 /*
1779 * ...same with cancel inform
1780 */
1783
1784 /*
1785 * ....same with cancel partial
1786 */
1789 treq = tconn->cancel_partial;
1790 if (treq) {
1794 }
1795 }
1796
1797 /*
1798 * ...and pending.
1799 */
1800 if (states & TRUNK_REQUEST_STATE_PENDING) {
1801 while ((treq = fr_heap_peek(tconn->pending))) {
1806 }
1807 }
1808
1809 /*
1810 * Cancel partially sent requests
1811 */
1812 if (states & TRUNK_REQUEST_STATE_PARTIAL) {
1814 treq = tconn->partial;
1815 if (treq) {
1817
1818 /*
1819 * Don't allow the connection to change state whilst
1820 * we're draining requests from it.
1821 */
1827 }
1828 }
1829
1830 /*
1831 * Cancel sent requests
1832 */
1833 if (states & TRUNK_REQUEST_STATE_SENT) {
1834 /*
1835 * Don't allow the connection to change state whilst
1836 * we're draining requests from it.
1837 */
1839 while ((treq = fr_dlist_head(&tconn->sent))) {
1842
1846 }
1848 }
1849
1850 return count;
1851}
1852
1853/** Remove requests in specified states from a connection, attempting to distribute them to new connections
1854 *
1855 * @param[in] tconn To remove requests from.
1856 * @param[in] states One or more states or'd together.
1857 * @param[in] max The maximum number of requests to dequeue.
1858 * 0 for unlimited.
1859 * @param[in] fail_bound If true causes any requests bound to the connection to fail.
1860 * If false bound requests will not be moved.
1861 *
1862 * @return the number of requests re-queued.
1863 */
1864static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
1865{
1866 trunk_t *trunk = tconn->pub.trunk;
1867 fr_dlist_head_t to_process;
1868 trunk_request_t *treq = NULL;
1869 uint64_t moved = 0;
1870
1871 if (max == 0) max = UINT64_MAX;
1872
1873 fr_dlist_talloc_init(&to_process, trunk_request_t, entry);
1874
1875 /*
1876 * Prevent the connection changing state whilst we're
1877 * working with it.
1878 *
1879 * There's a user callback that can be called by
1880 * trunk_request_enqueue_existing which can reconnect
1881 * the connection.
1882 */
1884
1885 /*
1886 * Remove non-cancelled requests from the connection
1887 */
1888 moved += trunk_connection_requests_dequeue(&to_process, tconn, states & ~TRUNK_REQUEST_STATE_CANCEL_ALL, max);
1889
1890 /*
1891 * Prevent requests being requeued on the same trunk
1892 * connection, which would break rebalancing.
1893 *
1894 * This is a bit of a hack, but nothing should test
1895 * for connection/list consistency in this code,
1896 * and if something is added later, it'll be flagged
1897 * by the tests.
1898 */
1899 if (tconn->pub.state == TRUNK_CONN_ACTIVE) {
1900 int ret;
1901
1902 ret = fr_minmax_heap_extract(trunk->active, tconn);
1903 if (!fr_cond_assert_msg(ret == 0,
1904 "Failed extracting conn from active heap: %s", fr_strerror())) goto done;
1905
1906 }
1907
1908 /*
1909 * Loop over all the requests we gathered and
1910 * redistribute them to new connections.
1911 */
1912 while ((treq = fr_dlist_next(&to_process, treq))) {
1913 trunk_request_t *prev;
1914
1915 prev = fr_dlist_remove(&to_process, treq);
1916
1917 /*
1918 * Attempts to re-queue a request
1919 * that's bound to a connection
1920 * results in a failure.
1921 */
1922 if (treq->bound_to_conn) {
1923 if (fail_bound || !IS_SERVICEABLE(tconn)) {
1925 } else {
1926 trunk_request_enter_pending(treq, tconn, false);
1927 }
1928 goto next;
1929 }
1930
1931 switch (trunk_request_enqueue_existing(treq)) {
1932 case TRUNK_ENQUEUE_OK:
1933 break;
1934
1935 /*
1936 * A connection failed, and
1937 * there's no other connections
1938 * available to deal with the
1939 * load, it's been placed back
1940 * in the backlog.
1941 */
1943 break;
1944
1945 /*
1946 * If we fail to re-enqueue then
1947 * there's nothing to do except
1948 * fail the request.
1949 */
1952 case TRUNK_ENQUEUE_FAIL:
1954 break;
1955 }
1956 next:
1957 treq = prev;
1958 }
1959
1960 /*
1961 * Add the connection back into the active list
1962 */
1963 if (tconn->pub.state == TRUNK_CONN_ACTIVE) {
1964 int ret;
1965
1966 ret = fr_minmax_heap_insert(trunk->active, tconn);
1967 if (!fr_cond_assert_msg(ret == 0,
1968 "Failed re-inserting conn into active heap: %s", fr_strerror())) goto done;
1969 }
1970 if (moved >= max) goto done;
1971
1972 /*
1973 * Deal with the cancelled requests specially we can't
1974 * queue them up again as they were only valid on that
1975 * specific connection.
1976 *
1977 * We just need to run them to completion which, as
1978 * they should already be in the unassigned state,
1979 * just means freeing them.
1980 */
1981 moved += trunk_connection_requests_dequeue(&to_process, tconn,
1982 states & TRUNK_REQUEST_STATE_CANCEL_ALL, max - moved);
1983 while ((treq = fr_dlist_next(&to_process, treq))) {
1984 trunk_request_t *prev;
1985
1986 prev = fr_dlist_remove(&to_process, treq);
1987 trunk_request_free(&treq);
1988 treq = prev;
1989 }
1990
1991done:
1992
1993 /*
1994 * Always re-calculate the request/connection
1995 * ratio at the end.
1996 *
1997 * This avoids having the state transition
1998 * functions do it.
1999 *
2000 * The ratio would be wrong when they calculated
2001 * it anyway, because a bunch of requests are
2002 * dequeued from the connection and temporarily
2003 * cease to exist from the perspective of the
2004 * trunk_requests_per_connection code.
2005 */
2006 trunk_requests_per_connection(NULL, NULL, trunk, fr_time(), false);
2007
2009 return moved;
2010}
2011
2012/** Move requests off of a connection and requeue elsewhere
2013 *
2014 * @note We don't re-queue on draining or draining to free, as requests should have already been
2015 * moved off of the connection. It's also dangerous as the trunk management code main
2016 * clean up a connection in this state when it's run on re-queue, and then the caller
2017 * may try and access a now freed connection.
2018 *
2019 * @param[in] tconn to move requests off of.
2020 * @param[in] states Only move requests in this state.
2021 * @param[in] max The maximum number of requests to dequeue. 0 for unlimited.
2022 * @param[in] fail_bound If true causes any requests bound to the connection to fail.
2023 * If false bound requests will not be moved.
2024 * @return The number of requests requeued.
2025 */
2026uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
2027{
2028 switch (tconn->pub.state) {
2029 case TRUNK_CONN_ACTIVE:
2030 case TRUNK_CONN_FULL:
2032 return trunk_connection_requests_requeue_priv(tconn, states, max, fail_bound);
2033
2034 default:
2035 return 0;
2036 }
2037}
2038
2039/** Signal a partial write
2040 *
2041 * Where there's high load, and the outbound write buffer is full
2042 *
2043 * @param[in] treq to signal state change for.
2044 */
2046{
2047 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2048
2050 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2051
2052 switch (treq->pub.state) {
2055 break;
2056
2057 default:
2058 return;
2059 }
2060}
2061
2062/** Signal that the request was written to a connection successfully
2063 *
2064 * @param[in] treq to signal state change for.
2065 */
2067{
2068 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2069
2071 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2072
2073 switch (treq->pub.state) {
2077 break;
2078
2079 default:
2080 return;
2081 }
2082}
2083
2084/** Signal that the request was written to a connection successfully, but no response is expected
2085 *
2086 * @param[in] treq to signal state change for.
2087 */
2089{
2090 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2091
2093 "%s can only be called from within request_mux handler", __FUNCTION__)) return;
2094
2095 switch (treq->pub.state) {
2099 break;
2100
2101 default:
2102 return;
2103 }
2104}
2105
2106/** Signal that a trunk request is complete
2107 *
2108 * The API client will be informed that the request is now complete.
2109 */
2111{
2112 trunk_t *trunk = treq->pub.trunk;
2113
2114 if (!fr_cond_assert_msg(trunk, "treq not associated with trunk")) return;
2115
2116 /*
2117 * We assume that if the request is being signalled
2118 * as complete from the demux function, that it was
2119 * a successful read.
2120 *
2121 * If this assumption turns out to be incorrect
2122 * then we need to add an argument to signal_complete
2123 * to indicate if this is a successful read.
2124 */
2125 if (IN_REQUEST_DEMUX(trunk)) {
2126 trunk_connection_t *tconn = treq->pub.tconn;
2127
2128 trunk->pub.last_read_success = fr_time();
2130 }
2131
2132 switch (treq->pub.state) {
2134 case TRUNK_REQUEST_STATE_PENDING: /* Got immediate response, i.e. cached */
2137 break;
2138
2139 default:
2140 return;
2141 }
2142}
2143
2144/** Signal that a trunk request failed
2145 *
2146 * The API client will be informed that the request has failed.
2147 */
2149{
2150 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2151
2153}
2154
2155/** Cancel a trunk request
2156 *
2157 * treq can be in any state, but requests to cancel if the treq is not in
2158 * the TRUNK_REQUEST_STATE_PARTIAL or TRUNK_REQUEST_STATE_SENT state will be ignored.
2159 *
2160 * The complete or failed callbacks will not be called here, as it's assumed the request_t *
2161 * is now inviable as it's being cancelled.
2162 *
2163 * The free function however, is called, and that should be used to perform necessary
2164 * cleanup.
2165 *
2166 * @param[in] treq to signal state change for.
2167 */
2169{
2170 trunk_t *trunk;
2171
2172 /*
2173 * Ensure treq hasn't been freed
2174 */
2175 (void)talloc_get_type_abort(treq, trunk_request_t);
2176
2177 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2178
2180 "%s cannot be called within a handler", __FUNCTION__)) return;
2181
2182 trunk = treq->pub.trunk;
2183
2184 switch (treq->pub.state) {
2185 /*
2186 * We don't call the complete or failed callbacks
2187 * as the request and rctx are no longer viable.
2188 */
2191 {
2192 trunk_connection_t *tconn = treq->pub.tconn;
2193
2194 /*
2195 * Don't allow connection state changes
2196 */
2200 "Bad state %s after cancellation",
2201 fr_table_str_by_value(trunk_request_states, treq->pub.state, "<INVALID>"))) {
2203 return;
2204 }
2205 /*
2206 * No cancel muxer. We're done.
2207 *
2208 * If we do have a cancel mux function,
2209 * the next time this connection becomes
2210 * writable, we'll call the cancel mux
2211 * function.
2212 *
2213 * We don't run the complete or failed
2214 * callbacks here as the request is
2215 * being cancelled.
2216 */
2217 if (!trunk->funcs.request_cancel_mux) {
2219 trunk_request_free(&treq);
2220 }
2222 }
2223 break;
2224
2225 /*
2226 * We're already in the process of cancelling a
2227 * request, so ignore duplicate signals.
2228 */
2233 break;
2234
2235 /*
2236 * For any other state, we just release the request
2237 * from its current connection and free it.
2238 */
2239 default:
2241 trunk_request_free(&treq);
2242 break;
2243 }
2244}
2245
2246/** Signal a partial cancel write
2247 *
2248 * Where there's high load, and the outbound write buffer is full
2249 *
2250 * @param[in] treq to signal state change for.
2251 */
2253{
2254 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2255
2257 "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2258
2259 switch (treq->pub.state) {
2262 break;
2263
2264 default:
2265 return;
2266 }
2267}
2268
2269/** Signal that a remote server has been notified of the cancellation
2270 *
2271 * Called from request_cancel_mux to indicate that the datastore has been informed
2272 * that the response is no longer needed.
2273 *
2274 * @param[in] treq to signal state change for.
2275 */
2277{
2278 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2279
2281 "%s can only be called from within request_cancel_mux handler", __FUNCTION__)) return;
2282
2283 switch (treq->pub.state) {
2287 break;
2288
2289 default:
2290 break;
2291 }
2292}
2293
2294/** Signal that a remote server acked our cancellation
2295 *
2296 * Called from request_demux to indicate that it got an ack for the cancellation.
2297 *
2298 * @param[in] treq to signal state change for.
2299 */
2301{
2302 if (!fr_cond_assert_msg(treq->pub.trunk, "treq not associated with trunk")) return;
2303
2305 "%s can only be called from within request_demux or request_cancel_mux handlers",
2306 __FUNCTION__)) return;
2307
2308 switch (treq->pub.state) {
2310 /*
2311 * This is allowed, as we may not need to wait
2312 * for the database to ACK our cancellation
2313 * request.
2314 *
2315 * Note: TRUNK_REQUEST_STATE_CANCEL_PARTIAL
2316 * is not allowed here, as that'd mean we'd half
2317 * written the cancellation request out to the
2318 * socket, and then decided to abandon it.
2319 *
2320 * That'd leave the socket in an unusable state.
2321 */
2324 break;
2325
2326 default:
2327 break;
2328 }
2329}
2330
2331/** If the trunk request is freed then update the target requests
2332 *
2333 * gperftools showed calling the request free function directly was slightly faster
2334 * than using talloc_free.
2335 *
2336 * @param[in] treq_to_free request.
2337 */
2339{
2340 trunk_request_t *treq = *treq_to_free;
2341 trunk_t *trunk = treq->pub.trunk;
2342
2343 if (unlikely(!treq)) return;
2344
2345 /*
2346 * The only valid states a trunk request can be
2347 * freed from.
2348 */
2349 switch (treq->pub.state) {
2355 break;
2356
2357 default:
2358 if (!fr_cond_assert(0)) return;
2359 }
2360
2361 /*
2362 * Zero out the pointer to prevent double frees
2363 */
2364 *treq_to_free = NULL;
2365
2366 /*
2367 * Call the API client callback to free
2368 * any associated memory.
2369 */
2370 DO_REQUEST_FREE(treq);
2371
2372 /*
2373 * Update the last above/below target stats
2374 * We only do this when we alloc or free
2375 * connections, or on connection
2376 * state changes.
2377 */
2378 trunk_requests_per_connection(NULL, NULL, treq->pub.trunk, fr_time(), false);
2379
2380 /*
2381 * This tracks the total number of requests
2382 * allocated and not freed or returned to
2383 * the free list.
2384 */
2385 if (fr_cond_assert(trunk->pub.req_alloc > 0)) trunk->pub.req_alloc--;
2386
2387 /*
2388 * No cleanup delay, means cleanup immediately
2389 */
2392
2393#ifndef NDEBUG
2394 /*
2395 * Ensure anything parented off the treq
2396 * is freed. We do this to trigger
2397 * the destructors for the log entries.
2398 */
2399 talloc_free_children(treq);
2400
2401 /*
2402 * State log should now be empty as entries
2403 * remove themselves from the dlist
2404 * on free.
2405 */
2407 "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2408#endif
2409
2410 talloc_free(treq);
2411 return;
2412 }
2413
2414 /*
2415 * Ensure anything parented off the treq
2416 * is freed.
2417 */
2418 talloc_free_children(treq);
2419
2420#ifndef NDEBUG
2421 /*
2422 * State log should now be empty as entries
2423 * remove themselves from the dlist
2424 * on free.
2425 */
2427 "Should have 0 remaining log entries, have %u", fr_dlist_num_elements(&treq->log));
2428#endif
2429
2430 /*
2431 *
2432 * Return the trunk request back to the init state.
2433 */
2434 *treq = (trunk_request_t){
2435 .pub = {
2437 .trunk = treq->pub.trunk,
2438 },
2439 .cancel_reason = TRUNK_CANCEL_REASON_NONE,
2440 .last_freed = fr_time(),
2441#ifndef NDEBUG
2442 .log = treq->log /* Keep the list head, to save reinitialisation */
2443#endif
2444 };
2445
2446 /*
2447 * Insert at the head, so that we can free
2448 * requests that have been unused for N
2449 * seconds from the tail.
2450 */
2451 fr_dlist_insert_tail(&trunk->free_requests, treq);
2452}
2453
2454/** Actually free the trunk request
2455 *
2456 */
2458{
2459 trunk_t *trunk = treq->pub.trunk;
2460
2461 switch (treq->pub.state) {
2464 break;
2465
2466 default:
2467 fr_assert(0);
2468 break;
2469 }
2470
2471 fr_dlist_remove(&trunk->free_requests, treq);
2472
2473 return 0;
2474}
2475
2476/** (Pre-)Allocate a new trunk request
2477 *
2478 * If trunk->conf.req_pool_headers or trunk->conf.req_pool_size are not zero then the
2479 * request will be a talloc pool, which can be used to hold the preq.
2480 *
2481 * @note Do not use MEM to check the result of this allocated as it may fail for
2482 * non-fatal reasons.
2483 *
2484 * @param[in] trunk to add request to.
2485 * @param[in] request to wrap in a trunk request (treq).
2486 * @return
2487 * - A newly allocated request.
2488 * - NULL if too many requests are allocated.
2489 */
2491{
2492 trunk_request_t *treq;
2493
2494 /*
2495 * The number of treqs currently allocated
2496 * exceeds the maximum number allowed.
2497 */
2498 if (trunk->conf.max_req_per_conn && trunk->conf.max) {
2499 uint64_t limit;
2500
2501 limit = (uint64_t) trunk->conf.max_req_per_conn * trunk->conf.max;
2502 if (trunk->pub.req_alloc >= (limit + trunk->conf.max_backlog)) {
2504 RWARN, WARN, "Refusing to alloc requests - "
2505 "Limit of %"PRIu64" (max = %u * per_connection_max = %u) "
2506 "plus %u backlog requests reached",
2507 limit, trunk->conf.max, trunk->conf.max_req_per_conn,
2508 trunk->conf.max_backlog);
2509 return NULL;
2510 }
2511 }
2512
2513 /*
2514 * Allocate or reuse an existing request
2515 */
2516 treq = fr_dlist_head(&trunk->free_requests);
2517 if (treq) {
2518 fr_dlist_remove(&trunk->free_requests, treq);
2520 fr_assert(treq->pub.trunk == trunk);
2521 fr_assert(treq->pub.tconn == NULL);
2524 trunk->pub.req_alloc_reused++;
2525 } else {
2527 trunk->conf.req_pool_headers, trunk->conf.req_pool_size));
2528 talloc_set_destructor(treq, _trunk_request_free);
2529
2530 *treq = (trunk_request_t){
2531 .pub = {
2533 .trunk = trunk
2534 },
2535 .cancel_reason = TRUNK_CANCEL_REASON_NONE
2536 };
2537 trunk->pub.req_alloc_new++;
2538#ifndef NDEBUG
2540#endif
2541 }
2542
2543 trunk->pub.req_alloc++;
2545 /* heap_id - initialised when treq inserted into pending */
2546 /* list - empty */
2547 /* preq - populated later */
2548 /* rctx - populated later */
2549 treq->pub.request = request;
2550
2551 return treq;
2552}
2553
2554/** Enqueue a request that needs data written to the trunk
2555 *
2556 * When a request_t * needs to make an asynchronous request to an external datastore
2557 * it should call this function, specifying a preq (protocol request) containing
2558 * the data necessary to request information from the external datastore, and an
2559 * rctx (resume ctx) used to hold the decoded response and/or any error codes.
2560 *
2561 * After a treq is successfully enqueued it will either be assigned immediately
2562 * to the pending queue of a connection, or if no connections are available,
2563 * (depending on the trunk configuration) the treq will be placed in the trunk's
2564 * global backlog.
2565 *
2566 * After receiving a positive return code from this function the caller should
2567 * immediately yield, to allow the various timers and I/O handlers that drive tconn
2568 * (trunk connection) and treq state changes to be called.
2569 *
2570 * When a tconn becomes writable (or the trunk is configured to be always writable)
2571 * the #trunk_request_mux_t callback will be called to dequeue, encode and
2572 * send any pending requests for that tconn. The #trunk_request_mux_t callback
2573 * is also responsible for tracking the outbound requests to allow the
2574 * #trunk_request_demux_t callback to match inbound responses with the original
2575 * treq. Once the #trunk_request_mux_t callback is done processing the treq
2576 * it signals what state the treq should enter next using one of the
2577 * trunk_request_signal_* functions.
2578 *
2579 * When a tconn becomes readable the user specified #trunk_request_demux_t
2580 * callback is called to process any responses, match them with the original treq.
2581 * and signal what state they should enter next using one of the
2582 * trunk_request_signal_* functions.
2583 *
2584 * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2585 * is NULL, a new treq will be allocated.
2586 * Otherwise treq should point to memory allocated
2587 * with trunk_request_alloc.
2588 * @param[in] trunk to enqueue request on.
2589 * @param[in] request to enqueue.
2590 * @param[in] preq Protocol request to write out. Will be freed when
2591 * treq is freed. Should ideally be parented by the
2592 * treq if possible.
2593 * Use #trunk_request_alloc for pre-allocation of
2594 * the treq.
2595 * @param[in] rctx The resume context to write any result to.
2596 * @return
2597 * - TRUNK_ENQUEUE_OK.
2598 * - TRUNK_ENQUEUE_IN_BACKLOG.
2599 * - TRUNK_ENQUEUE_NO_CAPACITY.
2600 * - TRUNK_ENQUEUE_DST_UNAVAILABLE
2601 * - TRUNK_ENQUEUE_FAIL
2602 */
2604 request_t *request, void *preq, void *rctx)
2605{
2606 trunk_connection_t *tconn = NULL;
2607 trunk_request_t *treq;
2608 trunk_enqueue_t ret;
2609
2610 if (!fr_cond_assert_msg(!IN_HANDLER(trunk),
2611 "%s cannot be called within a handler", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2612
2613 if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == TRUNK_REQUEST_STATE_INIT),
2614 "%s requests must be in \"init\" state", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2615
2616 /*
2617 * If delay_start was set, we may need
2618 * to insert the timer for the connection manager.
2619 */
2620 if (unlikely(!trunk->started)) {
2621 if (trunk_start(trunk) < 0) return TRUNK_ENQUEUE_FAIL;
2622 }
2623
2624 ret = trunk_request_check_enqueue(&tconn, trunk, request);
2625 switch (ret) {
2626 case TRUNK_ENQUEUE_OK:
2627 if (*treq_out) {
2628 treq = *treq_out;
2629 } else {
2630 *treq_out = treq = trunk_request_alloc(trunk, request);
2631 if (!treq) return TRUNK_ENQUEUE_FAIL;
2632 }
2633 treq->pub.preq = preq;
2634 treq->pub.rctx = rctx;
2635 if (trunk->conf.always_writable) {
2637 trunk_request_enter_pending(treq, tconn, true);
2640 } else {
2641 trunk_request_enter_pending(treq, tconn, true);
2642 }
2643 break;
2644
2646 if (*treq_out) {
2647 treq = *treq_out;
2648 } else {
2649 *treq_out = treq = trunk_request_alloc(trunk, request);
2650 if (!treq) return TRUNK_ENQUEUE_FAIL;
2651 }
2652 treq->pub.preq = preq;
2653 treq->pub.rctx = rctx;
2654 trunk_request_enter_backlog(treq, true);
2655 break;
2656
2657 default:
2658 /*
2659 * If a trunk request was provided
2660 * populate the preq and rctx fields
2661 * so that if it's freed with
2662 * trunk_request_free, the free
2663 * function works as intended.
2664 */
2665 if (*treq_out) {
2666 treq = *treq_out;
2667 treq->pub.preq = preq;
2668 treq->pub.rctx = rctx;
2669 }
2670 return ret;
2671 }
2672
2673 return ret;
2674}
2675
2676/** Re-enqueue a request on the same connection
2677 *
2678 * If the treq has been sent, we assume that we're being signalled to requeue
2679 * because something outside of the trunk API has determined that a retransmission
2680 * is required. The easiest way to perform that retransmission is to clean up
2681 * any tracking information for the request, and the requeue it for transmission.
2682 *
2683 * IF re-queueing fails, the request will enter the fail state. It should not be
2684 * accessed if this occurs.
2685 *
2686 * @param[in] treq to requeue (retransmit).
2687 * @return
2688 * - TRUNK_ENQUEUE_OK.
2689 * - TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2690 * - TRUNK_ENQUEUE_FAIL - Request isn't in a valid state to be reassigned.
2691 */
2693{
2694 trunk_connection_t *tconn = treq->pub.tconn; /* Existing conn */
2695
2696 if (!tconn) return TRUNK_ENQUEUE_FAIL;
2697
2698 if (!IS_PROCESSING(tconn)) {
2701 }
2702
2703 switch (treq->pub.state) {
2709 trunk_request_enter_pending(treq, tconn, false);
2710 if (treq->pub.trunk->conf.always_writable) {
2712 }
2714 break;
2715
2716 case TRUNK_REQUEST_STATE_BACKLOG: /* Do nothing.... */
2717 case TRUNK_REQUEST_STATE_PENDING: /* Do nothing.... */
2718 break;
2719
2720 default:
2722 return TRUNK_ENQUEUE_FAIL;
2723 }
2724
2725 return TRUNK_ENQUEUE_OK;
2726}
2727
2728/** Enqueue additional requests on a specific connection
2729 *
2730 * This may be used to create a series of requests on a single connection, or to generate
2731 * in-band status checks.
2732 *
2733 * @note If conf->always_writable, then the muxer will be called immediately. The caller
2734 * must be able to handle multiple calls to its muxer gracefully.
2735 *
2736 * @param[in,out] treq_out A trunk request handle. If the memory pointed to
2737 * is NULL, a new treq will be allocated.
2738 * Otherwise treq should point to memory allocated
2739 * with trunk_request_alloc.
2740 * @param[in] tconn to enqueue request on.
2741 * @param[in] request to enqueue.
2742 * @param[in] preq Protocol request to write out. Will be freed when
2743 * treq is freed. Should ideally be parented by the
2744 * treq if possible.
2745 * Use #trunk_request_alloc for pre-allocation of
2746 * the treq.
2747 * @param[in] rctx The resume context to write any result to.
2748 * @param[in] ignore_limits Ignore max_req_per_conn. Useful to force status
2749 * checks through even if the connection is at capacity.
2750 * Will also allow enqueuing on "inactive", "draining",
2751 * "draining-to-free" connections.
2752 * @return
2753 * - TRUNK_ENQUEUE_OK.
2754 * - TRUNK_ENQUEUE_NO_CAPACITY - At max_req_per_conn_limit
2755 * - TRUNK_ENQUEUE_DST_UNAVAILABLE - Connection cannot service requests.
2756 */
2758 request_t *request, void *preq, void *rctx,
2759 bool ignore_limits)
2760{
2761 trunk_request_t *treq;
2762 trunk_t *trunk = tconn->pub.trunk;
2763
2764 if (!fr_cond_assert_msg(!*treq_out || ((*treq_out)->pub.state == TRUNK_REQUEST_STATE_INIT),
2765 "%s requests must be in \"init\" state", __FUNCTION__)) return TRUNK_ENQUEUE_FAIL;
2766
2768
2769 /*
2770 * Limits check
2771 */
2772 if (!ignore_limits) {
2773 if (trunk->conf.max_req_per_conn &&
2776
2778 }
2779
2780 if (*treq_out) {
2781 treq = *treq_out;
2782 } else {
2783 MEM(*treq_out = treq = trunk_request_alloc(trunk, request));
2784 }
2785
2786 treq->pub.preq = preq;
2787 treq->pub.rctx = rctx;
2788 treq->bound_to_conn = true; /* Don't let the request be transferred */
2789
2790 if (trunk->conf.always_writable) {
2792 trunk_request_enter_pending(treq, tconn, true);
2795 } else {
2796 trunk_request_enter_pending(treq, tconn, true);
2797 }
2798
2799 return TRUNK_ENQUEUE_OK;
2800}
2801
2802#ifndef NDEBUG
2803/** Used for sanity checks to ensure all log entries have been freed
2804 *
2805 */
2807{
2808 fr_dlist_remove(slog->log_head, slog);
2809
2810 return 0;
2811}
2812
2813void trunk_request_state_log_entry_add(char const *function, int line,
2815{
2816 trunk_request_state_log_t *slog = NULL;
2817
2819 slog = fr_dlist_head(&treq->log);
2820 fr_assert_msg(slog, "slog list head NULL but element counter was %u",
2821 fr_dlist_num_elements(&treq->log));
2822 (void)fr_dlist_remove(&treq->log, slog); /* Returns NULL when removing the list head */
2823 memset(slog, 0, sizeof(*slog));
2824 } else {
2825 MEM(slog = talloc_zero(treq, trunk_request_state_log_t));
2826 talloc_set_destructor(slog, _state_log_entry_free);
2827 }
2828
2829 slog->log_head = &treq->log;
2830 slog->from = treq->pub.state;
2831 slog->to = new;
2832 slog->function = function;
2833 slog->line = line;
2834 if (treq->pub.tconn) {
2835 slog->tconn = treq->pub.tconn;
2836 slog->tconn_id = treq->pub.tconn->pub.conn->id;
2837 slog->tconn_state = treq->pub.tconn->pub.state;
2838 }
2839
2840 fr_dlist_insert_tail(&treq->log, slog);
2841
2842}
2843
2844void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line,
2845 trunk_request_t const *treq)
2846{
2847 trunk_request_state_log_t *slog = NULL;
2848
2849 int i;
2850
2851 for (slog = fr_dlist_head(&treq->log), i = 0;
2852 slog;
2853 slog = fr_dlist_next(&treq->log, slog), i++) {
2854 fr_log(log, log_type, file, line, "[%u] %s:%i - in conn %"PRIu64" in state %s - %s -> %s",
2855 i, slog->function, slog->line,
2856 slog->tconn_id,
2858 slog->tconn_state, "<INVALID>") : "none",
2859 fr_table_str_by_value(trunk_request_states, slog->from, "<INVALID>"),
2860 fr_table_str_by_value(trunk_request_states, slog->to, "<INVALID>"));
2861 }
2862}
2863#endif
2864
2865/** Return the count number of connections in the specified states
2866 *
2867 * @param[in] trunk to retrieve counts for.
2868 * @param[in] conn_state One or more #trunk_connection_state_t states or'd together.
2869 * @return The number of connections in the specified states.
2870 */
2872{
2873 uint16_t count = 0;
2874
2875 if (conn_state & TRUNK_CONN_INIT) count += fr_dlist_num_elements(&trunk->init);
2876 if (conn_state & TRUNK_CONN_CONNECTING) count += fr_dlist_num_elements(&trunk->connecting);
2877 if (conn_state & TRUNK_CONN_ACTIVE) count += fr_minmax_heap_num_elements(trunk->active);
2878 if (conn_state & TRUNK_CONN_FULL) count += fr_dlist_num_elements(&trunk->full);
2879 if (conn_state & TRUNK_CONN_INACTIVE) count += fr_dlist_num_elements(&trunk->inactive);
2881 if (conn_state & TRUNK_CONN_CLOSED) count += fr_dlist_num_elements(&trunk->closed);
2882 if (conn_state & TRUNK_CONN_DRAINING) count += fr_dlist_num_elements(&trunk->draining);
2884
2885 return count;
2886}
2887
2888/** Return the count number of requests associated with a trunk connection
2889 *
2890 * @param[in] tconn to return request count for.
2891 * @param[in] req_state One or more request states or'd together.
2892 *
2893 * @return The number of requests in the specified states, associated with a tconn.
2894 */
2896{
2897 uint32_t count = 0;
2898
2900 if (req_state & TRUNK_REQUEST_STATE_PARTIAL) count += tconn->partial ? 1 : 0;
2901 if (req_state & TRUNK_REQUEST_STATE_SENT) count += fr_dlist_num_elements(&tconn->sent);
2903 if (req_state & TRUNK_REQUEST_STATE_CANCEL) count += fr_dlist_num_elements(&tconn->cancel);
2904 if (req_state & TRUNK_REQUEST_STATE_CANCEL_PARTIAL) count += tconn->cancel_partial ? 1 : 0;
2906
2907 return count;
2908}
2909
2910/** Automatically mark a connection as full
2911 *
2912 * @param[in] tconn to potentially mark as full.
2913 */
2915{
2916 trunk_t *trunk = tconn->pub.trunk;
2918
2919 if (tconn->pub.state != TRUNK_CONN_ACTIVE) return;
2920
2921 /*
2922 * Enforces max_req_per_conn
2923 */
2924 if (trunk->conf.max_req_per_conn > 0) {
2927 }
2928}
2929
2930/** Return whether a trunk connection should currently be considered full
2931 *
2932 * @param[in] tconn to check.
2933 * @return
2934 * - true if the connection is full.
2935 * - false if the connection is not full.
2936 */
2938{
2939 trunk_t *trunk = tconn->pub.trunk;
2941
2942 /*
2943 * Enforces max_req_per_conn
2944 */
2946 if ((trunk->conf.max_req_per_conn == 0) || (count < trunk->conf.max_req_per_conn)) return false;
2947
2948 return true;
2949}
2950
2951/** Automatically mark a connection as active or reconnect it
2952 *
2953 * @param[in] tconn to potentially mark as active or reconnect.
2954 */
2956{
2957 if (tconn->pub.state != TRUNK_CONN_FULL) return;
2958
2959 /*
2960 * Enforces max_req_per_conn
2961 */
2963}
2964
2965/** A connection is readable. Call the request_demux function to read pending requests
2966 *
2967 */
2969{
2970 trunk_t *trunk = tconn->pub.trunk;
2971
2972 DO_REQUEST_DEMUX(tconn);
2973}
2974
2975/** A connection is writable. Call the request_mux function to write pending requests
2976 *
2977 */
2979{
2980 trunk_t *trunk = tconn->pub.trunk;
2981
2982 /*
2983 * Call the cancel_sent function (if we have one)
2984 * to inform a backend datastore we no longer
2985 * care about the result
2986 */
2990 DO_REQUEST_CANCEL_MUX(tconn);
2991 }
2995 DO_REQUEST_MUX(tconn);
2996}
2997
2998/** Update the registrations for I/O events we're interested in
2999 *
3000 */
3002{
3003 trunk_t *trunk = tconn->pub.trunk;
3005
3006 switch (tconn->pub.state) {
3007 /*
3008 * We only register I/O events if the trunk connection is
3009 * in one of these states.
3010 *
3011 * For the other states the trunk shouldn't be processing
3012 * requests.
3013 */
3014 case TRUNK_CONN_ACTIVE:
3015 case TRUNK_CONN_FULL:
3020 /*
3021 * If the connection is always writable,
3022 * then we don't care about write events.
3023 */
3024 if (!trunk->conf.always_writable &&
3028 (trunk->funcs.request_cancel_mux ?
3032 }
3033
3036 (trunk->funcs.request_cancel_mux ?
3039 }
3040 break;
3041
3042 default:
3043 break;
3044 }
3045
3046 if (tconn->events != events) {
3047 /*
3048 * There may be a fatal error which results
3049 * in the connection being freed.
3050 *
3051 * Stop that from happening until after
3052 * we're done using it.
3053 */
3056 tconn->events = events;
3058 }
3059}
3060
3061/** Remove a trunk connection from whichever list it's currently in
3062 *
3063 * @param[in] tconn to remove.
3064 */
3066{
3067 trunk_t *trunk = tconn->pub.trunk;
3068
3069 switch (tconn->pub.state) {
3070 case TRUNK_CONN_ACTIVE:
3071 {
3072 int ret;
3073
3074 ret = fr_minmax_heap_extract(trunk->active, tconn);
3075 if (!fr_cond_assert_msg(ret == 0, "Failed extracting conn from active heap: %s", fr_strerror())) return;
3076 }
3077 return;
3078
3079 case TRUNK_CONN_INIT:
3080 fr_dlist_remove(&trunk->init, tconn);
3081 break;
3082
3084 fr_dlist_remove(&trunk->connecting, tconn);
3085 return;
3086
3087 case TRUNK_CONN_CLOSED:
3088 fr_dlist_remove(&trunk->closed, tconn);
3089 return;
3090
3091 case TRUNK_CONN_FULL:
3092 fr_dlist_remove(&trunk->full, tconn);
3093 return;
3094
3096 fr_dlist_remove(&trunk->inactive, tconn);
3097 return;
3098
3100 fr_dlist_remove(&trunk->inactive_draining, tconn);
3101 return;
3102
3104 fr_dlist_remove(&trunk->draining, tconn);
3105 return;
3106
3108 fr_dlist_remove(&trunk->draining_to_free, tconn);
3109 return;
3110
3111 case TRUNK_CONN_HALTED:
3112 return;
3113 }
3114}
3115
3116/** Transition a connection to the full state
3117 *
3118 * Called whenever a trunk connection is at the maximum number of requests.
3119 * Removes the connection from the connected heap, and places it in the full list.
3120 */
3122{
3123 trunk_t *trunk = tconn->pub.trunk;
3124
3125 switch (tconn->pub.state) {
3126 case TRUNK_CONN_ACTIVE:
3128 break;
3129
3130 default:
3132 }
3133
3134 fr_dlist_insert_head(&trunk->full, tconn);
3136}
3137
3138/** Transition a connection to the inactive state
3139 *
3140 * Called whenever the API client wants to stop new requests being enqueued
3141 * on a trunk connection.
3142 */
3144{
3145 trunk_t *trunk = tconn->pub.trunk;
3146
3147 switch (tconn->pub.state) {
3148 case TRUNK_CONN_ACTIVE:
3149 case TRUNK_CONN_FULL:
3151 break;
3152
3153 default:
3155 }
3156
3157 fr_dlist_insert_head(&trunk->inactive, tconn);
3159}
3160
3161/** Transition a connection to the inactive-draining state
3162 *
3163 * Called whenever the trunk manager wants to drain an inactive connection
3164 * of its requests.
3165 */
3167{
3168 trunk_t *trunk = tconn->pub.trunk;
3169
3170 switch (tconn->pub.state) {
3174 break;
3175
3176 default:
3178 }
3179
3182
3183 /*
3184 * Immediately re-enqueue all pending
3185 * requests, so the connection is drained
3186 * quicker.
3187 */
3189}
3190
3191/** Transition a connection to the draining state
3192 *
3193 * Removes the connection from the active heap so it won't be assigned any new
3194 * connections.
3195 */
3197{
3198 trunk_t *trunk = tconn->pub.trunk;
3199
3200 switch (tconn->pub.state) {
3201 case TRUNK_CONN_ACTIVE:
3202 case TRUNK_CONN_FULL:
3206 break;
3207
3208 default:
3210 }
3211
3212 fr_dlist_insert_head(&trunk->draining, tconn);
3214
3215 /*
3216 * Immediately re-enqueue all pending
3217 * requests, so the connection is drained
3218 * quicker.
3219 */
3221}
3222
3223/** Transition a connection to the draining-to-reconnect state
3224 *
3225 * Removes the connection from the active heap so it won't be assigned any new
3226 * connections.
3227 */
3229{
3230 trunk_t *trunk = tconn->pub.trunk;
3231
3233
3234 switch (tconn->pub.state) {
3235 case TRUNK_CONN_ACTIVE:
3236 case TRUNK_CONN_FULL:
3241 break;
3242
3243 default:
3245 }
3246
3247 fr_dlist_insert_head(&trunk->draining_to_free, tconn);
3249
3250 /*
3251 * Immediately re-enqueue all pending
3252 * requests, so the connection is drained
3253 * quicker.
3254 */
3256}
3257
3258
3259/** Transition a connection back to the active state
3260 *
3261 * This should only be called on a connection which is in the full state,
3262 * inactive state, draining state or connecting state.
3263 */
3265{
3266 trunk_t *trunk = tconn->pub.trunk;
3267 int ret;
3268
3269 switch (tconn->pub.state) {
3270 case TRUNK_CONN_FULL:
3275 break;
3276
3277 case TRUNK_CONN_INIT:
3281 break;
3282
3283 default:
3285 }
3286
3287 ret = fr_minmax_heap_insert(trunk->active, tconn); /* re-insert into the active heap*/
3288 if (!fr_cond_assert_msg(ret == 0, "Failed inserting connection into active heap: %s", fr_strerror())) {
3290 return;
3291 }
3292
3294
3295 /*
3296 * Reorder the connections
3297 */
3298 CONN_REORDER(tconn);
3299
3300 /*
3301 * Rebalance requests
3302 */
3303 trunk_rebalance(trunk);
3304
3305 /*
3306 * We place requests into the backlog
3307 * because there were no connections
3308 * available to handle them.
3309 *
3310 * If a connection has become active
3311 * chances are those backlogged requests
3312 * can now be enqueued, so try and do
3313 * that now.
3314 *
3315 * If there's requests sitting in the
3316 * backlog indefinitely, it's because
3317 * they were inserted there erroneously
3318 * when there were active connections
3319 * which could have handled them.
3320 */
3321 trunk_backlog_drain(trunk);
3322}
3323
3324/** Connection transitioned to the the init state
3325 *
3326 * Reflect the connection state change in the lists we use to track connections.
3327 *
3328 * @note This function is only called from the connection API as a watcher.
3329 *
3330 * @param[in] conn The connection which changes state.
3331 * @param[in] prev The connection is was in.
3332 * @param[in] state The connection is now in.
3333 * @param[in] uctx The trunk_connection_t wrapping the connection.
3334 */
3338 void *uctx)
3339{
3340 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3341 trunk_t *trunk = tconn->pub.trunk;
3342
3343 switch (tconn->pub.state) {
3344 case TRUNK_CONN_HALTED:
3345 break;
3346
3347 case TRUNK_CONN_CLOSED:
3349 break;
3350
3351 default:
3353 }
3354
3355 fr_dlist_insert_head(&trunk->init, tconn);
3357}
3358
3359/** Connection transitioned to the connecting state
3360 *
3361 * Reflect the connection state change in the lists we use to track connections.
3362 *
3363 * @note This function is only called from the connection API as a watcher.
3364 *
3365 * @param[in] conn The connection which changes state.
3366 * @param[in] prev The connection is was in.
3367 * @param[in] state The connection is now in.
3368 * @param[in] uctx The trunk_connection_t wrapping the connection.
3369 */
3373 void *uctx)
3374{
3375 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3376 trunk_t *trunk = tconn->pub.trunk;
3377
3378 switch (tconn->pub.state) {
3379 case TRUNK_CONN_INIT:
3380 case TRUNK_CONN_CLOSED:
3382 break;
3383
3384 default:
3386 }
3387
3388 /*
3389 * If a connection just entered the
3390 * connecting state, it should have
3391 * no requests associated with it.
3392 */
3394
3395 fr_dlist_insert_head(&trunk->connecting, tconn); /* MUST remain a head insertion for reconnect logic */
3397}
3398
3399/** Connection transitioned to the shutdown state
3400 *
3401 * If we're not already in the draining-to-free state, transition there now.
3402 *
3403 * The idea is that if something signalled the connection to shutdown, we need
3404 * to reflect that by dequeuing any pending requests, not accepting new ones,
3405 * and waiting for the existing requests to complete.
3406 *
3407 * @note This function is only called from the connection API as a watcher.
3408 *
3409 * @param[in] conn The connection which changes state.
3410 * @param[in] prev The connection is was in.
3411 * @param[in] state The connection is now in.
3412 * @param[in] uctx The trunk_connection_t wrapping the connection.
3413 */
3417 void *uctx)
3418{
3419 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3420
3421 switch (tconn->pub.state) {
3422 case TRUNK_CONN_DRAINING_TO_FREE: /* Do Nothing */
3423 return;
3424
3425 case TRUNK_CONN_ACTIVE: /* Transition to draining-to-free */
3426 case TRUNK_CONN_FULL:
3430 break;
3431
3432 case TRUNK_CONN_INIT:
3434 case TRUNK_CONN_CLOSED:
3435 case TRUNK_CONN_HALTED:
3437 }
3438
3440}
3441
3442/** Trigger a reconnection of the trunk connection
3443 *
3444 * @param[in] tl timer list the timer was inserted into.
3445 * @param[in] now Current time.
3446 * @param[in] uctx The tconn.
3447 */
3449{
3450 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3451
3453}
3454
3455/** Connection transitioned to the connected state
3456 *
3457 * Reflect the connection state change in the lists we use to track connections.
3458 *
3459 * @note This function is only called from the connection API as a watcher.
3460 *
3461 * @param[in] conn The connection which changes state.
3462 * @param[in] prev The connection is was in.
3463 * @param[in] state The connection is now in.
3464 * @param[in] uctx The trunk_connection_t wrapping the connection.
3465 */
3469 void *uctx)
3470{
3471 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3472 trunk_t *trunk = tconn->pub.trunk;
3473
3474 /*
3475 * If a connection was just connected, it should only
3476 * have a pending list of requests. This state is found
3477 * in the rlm_radius module, which starts a new trunk,
3478 * and then immediately enqueues a request onto it. The
3479 * alternative for rlm_radius is to keep it's own queue
3480 * of pending requests before the trunk is fully
3481 * initialized. And then enqueue them onto the trunk
3482 * when the trunk is connected.
3483 *
3484 * It's instead easier (and makes more sense) to allow
3485 * the trunk to accept packets into its queue. If there
3486 * are no connections within a period of time, then the
3487 * requests will retry, or will time out.
3488 */
3490
3491 /*
3492 * Set here, as the active state can
3493 * be transitioned to from full and
3494 * draining too.
3495 */
3496 trunk->pub.last_connected = fr_time();
3497
3498 /*
3499 * Insert a timer to reconnect the
3500 * connection periodically.
3501 */
3502 if (fr_time_delta_ispos(trunk->conf.lifetime)) {
3503 if (fr_timer_in(tconn, trunk->el->tl, &tconn->lifetime_ev,
3504 trunk->conf.lifetime, false, _trunk_connection_lifetime_expire, tconn) < 0) {
3505 PERROR("Failed inserting connection reconnection timer event, halting connection");
3507 return;
3508 }
3509 }
3510
3512}
3513
3514/** Connection failed after it was connected
3515 *
3516 * Reflect the connection state change in the lists we use to track connections.
3517 *
3518 * @note This function is only called from the connection API as a watcher.
3519 *
3520 * @param[in] conn The connection which changes state.
3521 * @param[in] prev The connection is was in.
3522 * @param[in] state The connection is now in.
3523 * @param[in] uctx The trunk_connection_t wrapping the connection.
3524 */
3528 void *uctx)
3529{
3530 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3531 trunk_t *trunk = tconn->pub.trunk;
3532 bool need_requeue = false;
3533
3534 switch (tconn->pub.state) {
3535 case TRUNK_CONN_ACTIVE:
3536 case TRUNK_CONN_FULL:
3541 need_requeue = true;
3543 break;
3544
3545 case TRUNK_CONN_INIT: /* Initialisation failed */
3549 break;
3550
3551 case TRUNK_CONN_CLOSED:
3552 case TRUNK_CONN_HALTED: /* Can't move backwards? */
3554 }
3555
3556 fr_dlist_insert_head(&trunk->closed, tconn); /* MUST remain a head insertion for reconnect logic */
3558
3559 /*
3560 * Now *AFTER* the connection has been
3561 * removed from the active, pool
3562 * re-enqueue the requests.
3563 */
3564 if (need_requeue) trunk_connection_requests_requeue_priv(tconn, TRUNK_REQUEST_STATE_ALL, 0, true);
3565
3566 /*
3567 * There should be no requests left on this
3568 * connection. They should have all been
3569 * moved off or failed.
3570 */
3572
3573 /*
3574 * Clear statistics and flags
3575 */
3576 tconn->sent_count = 0;
3577
3578 /*
3579 * Remove the reconnect event
3580 */
3582
3583 /*
3584 * Remove the I/O events
3585 */
3587}
3588
3589/** Connection failed
3590 *
3591 * @param[in] conn The connection which changes state.
3592 * @param[in] prev The connection is was in.
3593 * @param[in] state The connection is now in.
3594 * @param[in] uctx The trunk_connection_t wrapping the connection.
3595 */
3597 connection_state_t prev,
3599 void *uctx)
3600{
3601 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3602 trunk_t *trunk = tconn->pub.trunk;
3603
3604 /*
3605 * Need to set this first as it
3606 * determines whether requests are
3607 * re-queued or fail outright.
3608 */
3609 trunk->pub.last_failed = fr_time();
3610
3611 /*
3612 * Failed in the init state, transition the
3613 * connection to closed, else we get an
3614 * INIT -> INIT transition which triggers
3615 * an assert.
3616 */
3617 if (prev == CONNECTION_STATE_INIT) _trunk_connection_on_closed(conn, prev, state, uctx);
3618
3619 /*
3620 * See what the state of the trunk is
3621 * if there are no connections that could
3622 * potentially accept requests in the near
3623 * future, then fail all the requests in the
3624 * trunk backlog.
3625 */
3631}
3632
3633/** Connection transitioned to the halted state
3634 *
3635 * Remove the connection remove all lists, as it's likely about to be freed.
3636 *
3637 * Setting the trunk back to the init state ensures that if the code is ever
3638 * refactored and #connection_signal_reconnect is used after a connection
3639 * is halted, then everything is maintained in a valid state.
3640 *
3641 * @note This function is only called from the connection API as a watcher.
3642 *
3643 * @param[in] conn The connection which changes state.
3644 * @param[in] prev The connection is was in.
3645 * @param[in] state The connection is now in.
3646 * @param[in] uctx The trunk_connection_t wrapping the connection.
3647 */
3651 void *uctx)
3652{
3653 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
3654 trunk_t *trunk = tconn->pub.trunk;
3655
3656 switch (tconn->pub.state) {
3657 case TRUNK_CONN_INIT:
3658 case TRUNK_CONN_CLOSED:
3660 break;
3661
3662 default:
3664 }
3665
3666 /*
3667 * It began life in the halted state,
3668 * and will end life in the halted state.
3669 */
3671
3672 /*
3673 * There should be no requests left on this
3674 * connection. They should have all been
3675 * moved off or failed.
3676 */
3678
3679 /*
3680 * And free the connection...
3681 */
3682 if (trunk->in_handler) {
3683 /*
3684 * ...later.
3685 */
3686 fr_dlist_insert_tail(&trunk->to_free, tconn);
3687 return;
3688 }
3689 talloc_free(tconn);
3690}
3691
3692/** Free a connection
3693 *
3694 * Enforces orderly free order of children of the tconn
3695 */
3697{
3699 fr_assert(!fr_dlist_entry_in_list(&tconn->entry)); /* Should not be in a list */
3700
3701 /*
3702 * Loop over all the requests we gathered
3703 * and transition them to the failed state,
3704 * freeing them.
3705 *
3706 * Usually, requests will be re-queued when
3707 * a connection enters the closed state,
3708 * but in this case because the whole trunk
3709 * is being freed, we don't bother, and
3710 * just signal to the API client that the
3711 * requests failed.
3712 */
3713 if (tconn->pub.trunk->freeing) {
3714 fr_dlist_head_t to_fail;
3715 trunk_request_t *treq = NULL;
3716
3717 fr_dlist_talloc_init(&to_fail, trunk_request_t, entry);
3718
3719 /*
3720 * Remove requests from this connection
3721 */
3723 while ((treq = fr_dlist_next(&to_fail, treq))) {
3724 trunk_request_t *prev;
3725
3726 prev = fr_dlist_remove(&to_fail, treq);
3728 treq = prev;
3729 }
3730 }
3731
3732 /*
3733 * Ensure we're not signalled by the connection
3734 * as it processes its backlog of state changes,
3735 * as we are about to be freed.
3736 */
3744
3745 /*
3746 * This may return -1, indicating the free was deferred
3747 * this is fine. It just means the conn will be freed
3748 * after all the handlers have exited.
3749 */
3750 (void)talloc_free(tconn->pub.conn);
3751 tconn->pub.conn = NULL;
3752
3753 return 0;
3754}
3755
3756/** Attempt to spawn a new connection
3757 *
3758 * Calls the API client's alloc() callback to create a new connection_t,
3759 * then inserts the connection into the 'connecting' list.
3760 *
3761 * @param[in] trunk to spawn connection in.
3762 * @param[in] now The current time.
3763 */
3765{
3766 trunk_connection_t *tconn;
3767
3768
3769 /*
3770 * Call the API client's callback to create
3771 * a new connection_t.
3772 */
3773 MEM(tconn = talloc_zero(trunk, trunk_connection_t));
3774 tconn->pub.trunk = trunk;
3775 tconn->pub.state = TRUNK_CONN_HALTED; /* All connections start in the halted state */
3776
3777 /*
3778 * Allocate a new connection_t or fail.
3779 */
3780 DO_CONNECTION_ALLOC(tconn);
3781
3783 fr_dlist_talloc_init(&tconn->sent, trunk_request_t, entry);
3787
3788 /*
3789 * OK, we have the connection, now setup watch
3790 * points so we know when it changes state.
3791 *
3792 * This lets us automatically move the tconn
3793 * between the different lists in the trunk
3794 * with minimum extra code.
3795 */
3797 _trunk_connection_on_init, false, tconn); /* Before init() has been called */
3798
3800 _trunk_connection_on_connecting, false, tconn); /* After init() has been called */
3801
3803 _trunk_connection_on_connected, false, tconn); /* After open() has been called */
3804
3806 _trunk_connection_on_closed, false, tconn); /* Before close() has been called */
3807
3809 _trunk_connection_on_failed, false, tconn); /* Before failed() has been called */
3810
3812 _trunk_connection_on_shutdown, false, tconn); /* After shutdown() has been called */
3813
3815 _trunk_connection_on_halted, false, tconn); /* About to be freed */
3816
3817 talloc_set_destructor(tconn, _trunk_connection_free);
3818
3819 connection_signal_init(tconn->pub.conn); /* annnnd GO! */
3820
3821 trunk->pub.last_open = now;
3822
3823 return 0;
3824}
3825
3826/** Pop a cancellation request off a connection's cancellation queue
3827 *
3828 * The request we return is advanced by the request moving out of the
3829 * cancel state and into the cancel_sent or cancel_complete state.
3830 *
3831 * One of these signalling functions must be called after the request
3832 * has been popped:
3833 *
3834 * - #trunk_request_signal_cancel_sent
3835 * The remote datastore has been informed, but we need to wait for acknowledgement.
3836 * The #trunk_request_demux_t callback must handle the acks calling
3837 * #trunk_request_signal_cancel_complete when an ack is received.
3838 *
3839 * - #trunk_request_signal_cancel_complete
3840 * The request was cancelled and we don't need to wait, clean it up immediately.
3841 *
3842 * @param[out] treq_out to process
3843 * @param[in] tconn Connection to drain cancellation request from.
3844 * @return
3845 * - 1 if no more requests.
3846 * - 0 if a new request was written to treq_out.
3847 * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3848 * memory or requests associated with the connection.
3849 * - -2 if called outside of the cancel muxer.
3850 */
3852{
3853 if (unlikely(tconn->pub.state == TRUNK_CONN_HALTED)) return -1;
3854
3856 "%s can only be called from within request_cancel_mux handler",
3857 __FUNCTION__)) return -2;
3858
3859 *treq_out = tconn->cancel_partial ? tconn->cancel_partial : fr_dlist_head(&tconn->cancel);
3860 if (!*treq_out) return 1;
3861
3862 return 0;
3863}
3864
3865/** Pop a request off a connection's pending queue
3866 *
3867 * The request we return is advanced by the request moving out of the partial or
3868 * pending states, when the mux function signals us.
3869 *
3870 * If the same request is returned again and again, it means the muxer isn't actually
3871 * doing anything with the request we returned, and it's and error in the muxer code.
3872 *
3873 * One of these signalling functions must be used after the request has been popped:
3874 *
3875 * - #trunk_request_signal_complete
3876 * The request was completed. Either we got a synchronous response, or we knew the
3877 * response without contacting an external server (cache).
3878 *
3879 * - #trunk_request_signal_fail
3880 * Failed muxing the request due to a permanent issue, i.e. an invalid request.
3881 *
3882 * - #trunk_request_signal_partial
3883 * Wrote part of a request. This request will be returned on the next call to this
3884 * function so that the request_mux function can finish writing it. Only useful
3885 * for stream type connections. Datagram type connections cannot have partial
3886 * writes.
3887 *
3888 * - #trunk_request_signal_sent Successfully sent a request.
3889 *
3890 * @param[out] treq_out to process
3891 * @param[in] tconn to pop a request from.
3892 * @return
3893 * - 1 if no more requests.
3894 * - 0 if a new request was written to treq_out.
3895 * - -1 if the connection was previously freed. Caller *MUST NOT* touch any
3896 * memory or requests associated with the connection.
3897 * - -2 if called outside of the muxer.
3898 */
3900{
3901 if (unlikely(tconn->pub.state == TRUNK_CONN_HALTED)) return -1;
3902
3904 "%s can only be called from within request_mux handler",
3905 __FUNCTION__)) return -2;
3906
3907 *treq_out = tconn->partial ? tconn->partial : fr_heap_peek(tconn->pending);
3908 if (!*treq_out) return 1;
3909
3910 return 0;
3911}
3912
3913/** Signal that a trunk connection is writable
3914 *
3915 * Should be called from the 'write' I/O handler to signal that requests can be enqueued.
3916 *
3917 * @param[in] tconn to signal.
3918 */
3920{
3921 trunk_t *trunk = tconn->pub.trunk;
3922
3923 if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3924 "%s cannot be called within a handler", __FUNCTION__)) return;
3925
3926 DEBUG3("[%" PRIu64 "] Signalled writable", tconn->pub.conn->id);
3927
3929}
3930
3931/** Signal that a trunk connection is readable
3932 *
3933 * Should be called from the 'read' I/O handler to signal that requests should be dequeued.
3934 *
3935 * @param[in] tconn to signal.
3936 */
3938{
3939 trunk_t *trunk = tconn->pub.trunk;
3940
3941 if (!fr_cond_assert_msg(!IN_HANDLER(tconn->pub.trunk),
3942 "%s cannot be called within a handler", __FUNCTION__)) return;
3943
3944 DEBUG3("[%" PRIu64 "] Signalled readable", tconn->pub.conn->id);
3945
3947}
3948
3949/** Signal a trunk connection cannot accept more requests
3950 *
3951 * @param[in] tconn to signal.
3952 */
3954{
3955 /* Can be called anywhere */
3956
3957 switch (tconn->pub.state) {
3958 case TRUNK_CONN_ACTIVE:
3959 case TRUNK_CONN_FULL:
3961 break;
3962
3965 break;
3966
3967 default:
3968 return;
3969 }
3970}
3971
3972/** Signal a trunk connection is no longer full
3973 *
3974 * @param[in] tconn to signal.
3975 */
3977{
3978 switch (tconn->pub.state) {
3979 case TRUNK_CONN_FULL:
3980 trunk_connection_auto_unfull(tconn); /* Mark as active if it should be active */
3981 break;
3982
3984 /*
3985 * Do the appropriate state transition based on
3986 * how many requests the trunk connection is
3987 * currently servicing.
3988 */
3989 if (trunk_connection_is_full(tconn)) {
3991 break;
3992 }
3994 break;
3995
3996 /*
3997 * Unsetting the active flag just moves
3998 * the connection back to the normal
3999 * draining state.
4000 */
4001 case TRUNK_CONN_INACTIVE_DRAINING: /* Only an external signal can trigger this transition */
4003 break;
4004
4005 default:
4006 return;
4007 }
4008}
4009
4010/** Signal a trunk connection is no longer viable
4011 *
4012 * @param[in] tconn to signal.
4013 * @param[in] reason the connection is being reconnected.
4014 */
4019
4020/** Standard I/O read function
4021 *
4022 * Underlying FD in now readable, so call the trunk to read any pending requests
4023 * from this connection.
4024 *
4025 * @param[in] el The event list signalling.
4026 * @param[in] fd that's now readable.
4027 * @param[in] flags describing the read event.
4028 * @param[in] uctx The trunk connection handle (tconn).
4029 */
4031{
4032 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
4033
4035}
4036
4037/** Standard I/O write function
4038 *
4039 * Underlying FD is now writable, so call the trunk to write any pending requests
4040 * to this connection.
4041 *
4042 * @param[in] el The event list signalling.
4043 * @param[in] fd that's now writable.
4044 * @param[in] flags describing the write event.
4045 * @param[in] uctx The trunk connection handle (tcon).
4046 */
4048{
4049 trunk_connection_t *tconn = talloc_get_type_abort(uctx, trunk_connection_t);
4050
4052}
4053
4054
4055/** Returns true if the trunk connection is in one of the specified states
4056 *
4057 * @param[in] tconn To check state for.
4058 * @param[in] state to check
4059 * @return
4060 * - True if trunk connection is in a particular state.
4061 * - False if trunk connection is not in a particular state.
4062 */
4064{
4065 return (bool)(tconn->pub.state & state);
4066}
4067
4068/** Close connections in a particular connection list if they have no requests associated with them
4069 *
4070 * @param[in] trunk containing connections we want to close.
4071 * @param[in] head of list of connections to examine.
4072 */
4074{
4075 trunk_connection_t *tconn = NULL;
4076
4077 while ((tconn = fr_dlist_next(head, tconn))) {
4078 trunk_connection_t *prev;
4079
4081
4082 prev = fr_dlist_prev(head, tconn);
4083
4084 DEBUG3("Closing %s connection with no requests",
4086 /*
4087 * Close the connection as gracefully
4088 * as possible by signalling it should
4089 * shutdown.
4090 *
4091 * The connection, should, if serviced
4092 * correctly by the underlying library,
4093 * automatically transition to halted after
4094 * all pending reads/writes are
4095 * complete at which point we'll be informed
4096 * and free our tconn wrapper.
4097 */
4099 tconn = prev;
4100 }
4101}
4102
4103/** Rebalance connections across active trunk members when a new connection becomes active
4104 *
4105 * We don't have any visibility into the connection prioritisation algorithm
4106 * it's essentially a black box.
4107 *
4108 * We can however determine when the correct level of requests per connection
4109 * has been reached, by dequeuing and requeing requests up until the point
4110 * where the connection that just had a request dequeued, receives the same
4111 * request back.
4112 *
4113 * @param[in] trunk The trunk to rebalance.
4114 */
4115static void trunk_rebalance(trunk_t *trunk)
4116{
4118
4120
4121 /*
4122 * Only rebalance if the top and bottom of
4123 * the heap are not equal.
4124 */
4125 if (trunk->funcs.connection_prioritise(fr_minmax_heap_max_peek(trunk->active), head) == 0) return;
4126
4127 DEBUG3("Rebalancing requests");
4128
4129 /*
4130 * Keep requeuing requests from the connection
4131 * at the bottom of the heap until the
4132 * connection at the top is shifted from that
4133 * position.
4134 */
4135 while ((fr_minmax_heap_min_peek(trunk->active) == head) &&
4137 TRUNK_REQUEST_STATE_PENDING, 1, false));
4138}
4139
4140/** Implements the algorithm we use to manage requests per connection levels
4141 *
4142 * This is executed periodically using a timer event, and opens/closes
4143 * connections.
4144 *
4145 * The aim is to try and keep the request per connection level in a sweet spot,
4146 * where there's enough outstanding work for the connection/pipelining to work
4147 * efficiently, but not so much so that we encounter increased latency.
4148 *
4149 * In the request enqueue and dequeue functions we record every time the
4150 * average number of requests per connection goes above the target count
4151 * and record every time the average number of requests per connection goes
4152 * below the target count.
4153 *
4154 * This may sound expensive, but in all cases we're just summing counters.
4155 * CPU time required does not increase with additional requests, only with
4156 * large numbers of connections.
4157 *
4158 * If we do encounter scaling issues, we can always maintain the counters
4159 * as aggregates as an optimisation later.
4160 *
4161 * If when the management function runs, the trunk was above the target
4162 * most recently, we:
4163 * - Return if we've been in this state for a shorter period than 'open_delay'.
4164 * - Return if we're at max.
4165 * - Return if opening a new connection will take us below the load target.
4166 * - Return if we last opened a connection within 'open_delay'.
4167 * - Otherwise we attempt to open a new connection.
4168 *
4169 * If the trunk we below the target most recently, we:
4170 * - Return if we've been in this state for a shorter period than 'close_delay'.
4171 * - Return if we're at min.
4172 * - Return if we have no connections.
4173 * - Close a connection if min is 0, and we have no outstanding
4174 * requests. Then return.
4175 * - Return if closing a new connection will take us above the load target.
4176 * - Return if we last closed a connection within 'closed_delay'.
4177 * - Otherwise we move a connection to draining state.
4178 */
4179static void trunk_manage(trunk_t *trunk, fr_time_t now)
4180{
4181 trunk_connection_t *tconn = NULL;
4182 trunk_request_t *treq;
4183 uint32_t average = 0;
4184 uint32_t req_count;
4185 uint16_t conn_count;
4186 trunk_state_t new_state;
4187
4188 DEBUG4("Managing trunk");
4189
4190 /*
4191 * Cleanup requests in our request cache which
4192 * have been reapable for too long.
4193 */
4194 while ((treq = fr_dlist_tail(&trunk->free_requests)) &&
4196
4197 /*
4198 * If we have idle connections, then close them.
4199 */
4202 fr_time_t idle_cutoff = fr_time_sub(now, trunk->conf.idle_timeout);
4203
4204 for (tconn = fr_minmax_heap_iter_init(trunk->active, &iter);
4205 tconn;
4206 tconn = fr_minmax_heap_iter_next(trunk->active, &iter)) {
4207 /*
4208 * The connection has outstanding requests without replies, don't do anything.
4209 */
4210 if (fr_heap_num_elements(tconn->pending) > 0) continue;
4211
4212 /*
4213 * The connection was last active after the idle cutoff time, don't do anything.
4214 */
4215 if (fr_time_gt(tconn->pub.last_write_success, idle_cutoff)) continue;
4216
4217 /*
4218 * This connection has been inactive since before the idle timeout. Drain it,
4219 * and free it.
4220 */
4222 }
4223 }
4224
4225 /*
4226 * Free any connections which have drained
4227 * and we didn't reactivate during the last
4228 * round of management.
4229 */
4233
4234 /*
4235 * Process deferred connection freeing
4236 */
4237 if (!trunk->in_handler) {
4238 while ((tconn = fr_dlist_head(&trunk->to_free))) talloc_free(fr_dlist_remove(&trunk->to_free, tconn));
4239 }
4240
4241 /*
4242 * Update the state of the trunk
4243 */
4245 new_state = TRUNK_STATE_ACTIVE;
4246 } else {
4247 /*
4248 * INIT / CONNECTING / FULL mean connections will become active
4249 * so the trunk is PENDING
4250 */
4255 }
4256
4257 if (new_state != trunk->pub.state) TRUNK_STATE_TRANSITION(new_state);
4258
4259 /*
4260 * A trunk can be signalled to not proactively
4261 * manage connections if a destination is known
4262 * to be unreachable, and doing so would result
4263 * in spurious connections still being opened.
4264 *
4265 * We still run other connection management
4266 * functions and just short circuit the function
4267 * here.
4268 */
4269 if (!trunk->managing_connections) return;
4270
4271 /*
4272 * We're above the target requests per connection
4273 * spawn more connections!
4274 */
4276 /*
4277 * If connecting is provided, check we
4278 * wouldn't have too many connections in
4279 * the connecting state.
4280 *
4281 * This is a throttle in the case of transitory
4282 * load spikes, or a backend becoming
4283 * unavailable.
4284 */
4285 if ((trunk->conf.connecting > 0) &&
4287 trunk->conf.connecting)) {
4288 DEBUG4("Not opening connection - Too many (%u) connections in the connecting state",
4289 trunk->conf.connecting);
4290 return;
4291 }
4292
4293 trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4294
4295 /*
4296 * Only apply hysteresis if we have at least
4297 * one available connection.
4298 */
4299 if (conn_count && fr_time_gt(fr_time_add(trunk->pub.last_above_target, trunk->conf.open_delay), now)) {
4300 DEBUG4("Not opening connection - Need to be above target for %pVs. It's been %pVs",
4303 return; /* too soon */
4304 }
4305
4306 /*
4307 * We don't consider 'draining' connections
4308 * in the max calculation, as if we do
4309 * determine that we need to spawn a new
4310 * request, then we'd move all 'draining'
4311 * connections to active before spawning
4312 * any new connections.
4313 */
4314 if ((trunk->conf.max > 0) && (conn_count >= trunk->conf.max)) {
4315 DEBUG4("Not opening connection - Have %u connections, need %u or below",
4316 conn_count, trunk->conf.max);
4317 return;
4318 }
4319
4320 /*
4321 * We consider requests pending on all connections
4322 * and the trunk's backlog as that's the current count
4323 * load.
4324 */
4325 if (!req_count) {
4326 DEBUG4("Not opening connection - No outstanding requests");
4327 return;
4328 }
4329
4330 /*
4331 * Do the n+1 check, i.e. if we open one connection
4332 * will that take us below our target threshold.
4333 */
4334 if (conn_count > 0) {
4335 average = ROUND_UP_DIV(req_count, (conn_count + 1));
4336 if (average < trunk->conf.target_req_per_conn) {
4337 DEBUG4("Not opening connection - Would leave us below our target requests "
4338 "per connection (now %u, after open %u)",
4339 ROUND_UP_DIV(req_count, conn_count), average);
4340 return;
4341 }
4342 } else {
4343 (void)trunk_connection_spawn(trunk, now);
4344 return;
4345 }
4346
4347 /*
4348 * If we've got a connection in the draining list
4349 * move it back into the active list if we've
4350 * been requested to add a connection back in.
4351 */
4352 tconn = fr_dlist_head(&trunk->draining);
4353 if (tconn) {
4354 if (trunk_connection_is_full(tconn)) {
4356 } else {
4358 }
4359 return;
4360 }
4361
4362 /*
4363 * Implement delay if there's no connections that
4364 * could be immediately re-activated.
4365 */
4366 if (fr_time_gt(fr_time_add(trunk->pub.last_open, trunk->conf.open_delay), now)) {
4367 DEBUG4("Not opening connection - Need to wait %pVs before opening another connection. "
4368 "It's been %pVs",
4371 return;
4372 }
4373
4374 DEBUG4("Opening connection - Above target requests per connection (now %u, target %u)",
4375 ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4376 /* last_open set by trunk_connection_spawn */
4377 (void)trunk_connection_spawn(trunk, now);
4378 }
4379
4380 /*
4381 * We're below the target requests per connection.
4382 * Free some connections...
4383 */
4384 else if (fr_time_gt(trunk->pub.last_below_target, trunk->pub.last_above_target)) {
4385 if (fr_time_gt(fr_time_add(trunk->pub.last_below_target, trunk->conf.close_delay), now)) {
4386 DEBUG4("Not closing connection - Need to be below target for %pVs. It's been %pVs",
4389 return; /* too soon */
4390 }
4391
4392 trunk_requests_per_connection(&conn_count, &req_count, trunk, now, true);
4393
4394 if (!conn_count) {
4395 DEBUG4("Not closing connection - No connections to close!");
4396 return;
4397 }
4398
4399 if ((trunk->conf.min > 0) && ((conn_count - 1) < trunk->conf.min)) {
4400 DEBUG4("Not closing connection - Have %u connections, need %u or above",
4401 conn_count, trunk->conf.min);
4402 return;
4403 }
4404
4405 if (!req_count) {
4406 DEBUG4("Closing connection - No outstanding requests");
4407 goto close;
4408 }
4409
4410 /*
4411 * The minimum number of connections must be set
4412 * to zero for this to work.
4413 * min == 0, no requests, close all the connections.
4414 * This is useful for backup databases, when
4415 * maintaining the connection would lead to lots of
4416 * log file churn.
4417 */
4418 if (conn_count == 1) {
4419 DEBUG4("Not closing connection - Would leave connections "
4420 "and there are still %u outstanding requests", req_count);
4421 return;
4422 }
4423
4424 /*
4425 * Do the n-1 check, i.e. if we close one connection
4426 * will that take us above our target threshold.
4427 */
4428 average = ROUND_UP_DIV(req_count, (conn_count - 1));
4429 if (average > trunk->conf.target_req_per_conn) {
4430 DEBUG4("Not closing connection - Would leave us above our target requests per connection "
4431 "(now %u, after close %u)", ROUND_UP_DIV(req_count, conn_count), average);
4432 return;
4433 }
4434
4435 DEBUG4("Closing connection - Below target requests per connection (now %u, target %u)",
4436 ROUND_UP_DIV(req_count, conn_count), trunk->conf.target_req_per_conn);
4437
4438 close:
4439 if (fr_time_gt(fr_time_add(trunk->pub.last_closed, trunk->conf.close_delay), now)) {
4440 DEBUG4("Not closing connection - Need to wait %pVs before closing another connection. "
4441 "It's been %pVs",
4444 return;
4445 }
4446
4447 /*
4448 * If the last event on the trunk was a connection failure and
4449 * there is only one connection, this may well be a reconnect
4450 * attempt after a failure - and needs to persist otherwise
4451 * the last event will be a failure and no new connection will
4452 * be made, leading to no new requests being enqueued.
4453 */
4454 if (fr_time_gt(trunk->pub.last_failed, fr_time_wrap(0)) &&
4455 fr_time_lt(trunk->pub.last_connected, trunk->pub.last_failed) && (conn_count == 1)) {
4456 DEBUG4("Not closing remaining connection - last event was a failure");
4457 return;
4458 }
4459
4460 /*
4461 * Inactive connections get counted in the
4462 * set of viable connections, but are likely
4463 * to be congested or dead, so we drain
4464 * (and possibly eventually free) those first.
4465 */
4466 if ((tconn = fr_dlist_tail(&trunk->inactive))) {
4467 /*
4468 * If the connection has no requests associated
4469 * with it then immediately free.
4470 */
4472 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4473 } else {
4475 }
4476 /*
4477 * It is possible to have too may connecting
4478 * connections when the connections are
4479 * taking a while to open and the number
4480 * of requests decreases.
4481 */
4482 } else if ((tconn = fr_dlist_tail(&trunk->connecting))) {
4483 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4484
4485 /*
4486 * Finally if there are no "connecting"
4487 * connections to close, and no "inactive"
4488 * connections, start draining "active"
4489 * connections.
4490 */
4491 } else if ((tconn = fr_minmax_heap_max_peek(trunk->active))) {
4492 /*
4493 * If the connection has no requests associated
4494 * with it then immediately free.
4495 */
4497 connection_signal_halt(tconn->pub.conn); /* Also frees the tconn */
4498 } else {
4500 }
4501 }
4502
4503 trunk->pub.last_closed = now;
4504
4505
4506 return;
4507 }
4508}
4509
4510/** Event to periodically call the connection management function
4511 *
4512 * @param[in] tl this event belongs to.
4513 * @param[in] now current time.
4514 * @param[in] uctx The trunk.
4515 */
4516static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx)
4517{
4518 trunk_t *trunk = talloc_get_type_abort(uctx, trunk_t);
4519
4520 trunk_manage(trunk, now);
4521
4523 if (fr_timer_in(trunk, tl, &trunk->manage_ev, trunk->conf.manage_interval,
4524 false, _trunk_timer, trunk) < 0) {
4525 PERROR("Failed inserting trunk management event");
4526 /* Not much we can do, hopefully the trunk will be freed soon */
4527 }
4528 }
4529}
4530
4531/** Return a count of requests on a connection in a specific state
4532 *
4533 * @param[in] trunk to retrieve counts for.
4534 * @param[in] conn_state One or more connection states or'd together.
4535 * @param[in] req_state One or more request states or'd together.
4536 * @return The number of requests in a particular state, on connection in a particular state.
4537 */
4538uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
4539{
4540 uint64_t count = 0;
4541 trunk_connection_t *tconn = NULL;
4543
4544#define COUNT_BY_STATE(_state, _list) \
4545do { \
4546 if (conn_state & (_state)) { \
4547 tconn = NULL; \
4548 while ((tconn = fr_dlist_next(&trunk->_list, tconn))) { \
4549 count += trunk_request_count_by_connection(tconn, req_state); \
4550 } \
4551 } \
4552} while (0)
4553
4554 if (conn_state & TRUNK_CONN_ACTIVE) {
4555 for (tconn = fr_minmax_heap_iter_init(trunk->active, &iter);
4556 tconn;
4557 tconn = fr_minmax_heap_iter_next(trunk->active, &iter)) {
4558 count += trunk_request_count_by_connection(tconn, req_state);
4559 }
4560 }
4561
4564 COUNT_BY_STATE(TRUNK_CONN_INACTIVE_DRAINING, inactive_draining);
4567
4569
4570 return count;
4571}
4572
4573/** Update timestamps for when we last had a transition from above target to below target or vice versa
4574 *
4575 * Should be called on every time a connection or request is allocated or freed.
4576 *
4577 * @param[out] conn_count_out How many connections we considered.
4578 * @param[out] req_count_out How many requests we considered.
4579 * @param[in] trunk to operate on.
4580 * @param[in] now The current time.
4581 * @param[in] verify if true (and this is a debug build), then assert if req_per_conn
4582 * has changed.
4583 * @return
4584 * - 0 if the average couldn't be calculated (no requests or no connections).
4585 * - The average number of requests per connection.
4586 */
4587static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_count_out,
4588 trunk_t *trunk, fr_time_t now,
4589 NDEBUG_UNUSED bool verify)
4590{
4591 uint32_t req_count = 0;
4592 uint16_t conn_count = 0;
4593 uint64_t req_per_conn = 0;
4594
4596
4597 /*
4598 * No need to update these as the trunk is being freed
4599 */
4600 if (trunk->freeing) goto done;
4601
4602 /*
4603 * Count all connections except draining and draining to free.
4604 *
4605 * Omitting these connection states artificially raises the
4606 * request to connection ratio, so that we can preemptively spawn
4607 * new connections.
4608 *
4609 * In the case of TRUNK_CONN_DRAINING | TRUNK_CONN_INACTIVE_DRAINING
4610 * the trunk management code has enough hysteresis to not
4611 * immediately reactivate the connection.
4612 *
4613 * In the case of TRUNK_CONN_DRAINING_TO_FREE the trunk
4614 * management code should spawn a new connection to takes its place.
4615 *
4616 * Connections placed in the DRAINING_TO_FREE state are being
4617 * closed preemptively to deal with bugs on the server we're
4618 * talking to, or misconfigured firewalls which are trashing
4619 * TCP/UDP connection states.
4620 */
4625
4626 /*
4627 * Requests on all connections
4628 */
4629 req_count = trunk_request_count_by_state(trunk,
4632
4633 /*
4634 * No connections, but we do have requests
4635 */
4636 if (conn_count == 0) {
4637 if ((req_count > 0) && (trunk->conf.target_req_per_conn > 0)) goto above_target;
4638 goto done;
4639 }
4640
4641 if (req_count == 0) {
4642 if (trunk->conf.target_req_per_conn > 0) goto below_target;
4643 goto done;
4644 }
4645
4646 /*
4647 * Calculate the req_per_conn
4648 */
4649 req_per_conn = ROUND_UP_DIV(req_count, conn_count);
4650 if (req_per_conn > trunk->conf.target_req_per_conn) {
4651 above_target:
4652 /*
4653 * Edge - Below target to above target (too many requests per conn - spawn more)
4654 *
4655 * The equality check is correct here as both values start at 0.
4656 */
4658 } else if (req_per_conn < trunk->conf.target_req_per_conn) {
4659 below_target:
4660 /*
4661 * Edge - Above target to below target (too few requests per conn - close some)
4662 *
4663 * The equality check is correct here as both values start at 0.
4664 */
4666 }
4667
4668done:
4669 if (conn_count_out) *conn_count_out = conn_count;
4670 if (req_count_out) *req_count_out = req_count;
4671
4672 /*
4673 * Check we haven't missed a call to trunk_requests_per_connection
4674 */
4675 fr_assert(!verify || (trunk->last_req_per_conn == 0) || (req_per_conn == trunk->last_req_per_conn));
4676
4677 trunk->last_req_per_conn = req_per_conn;
4678
4679 return req_per_conn;
4680}
4681
4682/** Drain the backlog of as many requests as possible
4683 *
4684 * @param[in] trunk To drain backlog requests for.
4685 */
4686static void trunk_backlog_drain(trunk_t *trunk)
4687{
4688 trunk_request_t *treq;
4689
4690 if (fr_heap_num_elements(trunk->backlog) == 0) return;
4691
4692 /*
4693 * If it's always writable, this isn't
4694 * really a noteworthy event.
4695 */
4696 if (!trunk->conf.always_writable) DEBUG3("Draining backlog of requests");
4697
4698 /*
4699 * Do *NOT* add an artificial limit
4700 * here. We rely on all available
4701 * connections entering the full
4702 * state and transitioning back to
4703 * active in order to drain the
4704 * backlog.
4705 */
4706 while ((treq = fr_heap_peek(trunk->backlog))) {
4707 switch (trunk_request_enqueue_existing(treq)) {
4708 case TRUNK_ENQUEUE_OK:
4709 continue;
4710
4711 /*
4712 * Signal to stop
4713 */
4715 break;
4716
4717 /*
4718 * Failed enqueueing the request,
4719 * have it enter the failed state
4720 * which will free it and
4721 * re-enliven the yielded request.
4722 */
4724 case TRUNK_ENQUEUE_FAIL:
4726 continue;
4727
4730 return;
4731 }
4732 }
4733}
4734
4735/** Force the trunk to re-establish its connections
4736 *
4737 * @param[in] trunk to signal.
4738 * @param[in] states One or more states or'd together.
4739 * @param[in] reason Why the connections are being signalled to reconnect.
4740 */
4741void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
4742{
4743
4744#define RECONNECT_BY_STATE(_state, _list) \
4745do { \
4746 if (states & (_state)) { \
4747 size_t i; \
4748 for (i = fr_dlist_num_elements(&trunk->_list); i > 0; i--) { \
4749 connection_signal_reconnect(((trunk_connection_t *)fr_dlist_tail(&trunk->_list))->pub.conn, reason); \
4750 } \
4751 } \
4752} while (0)
4753
4754 /*
4755 * Connections in the 'connecting' state
4756 * may re-enter that state, so we need to
4757 * be careful not to enter an infinite
4758 * loop, as we iterate over the list
4759 * again and again.
4760 */
4762
4763 if (states & TRUNK_CONN_ACTIVE) {
4764 trunk_connection_t *tconn;
4765 while ((tconn = fr_minmax_heap_min_peek(trunk->active))) connection_signal_reconnect(tconn->pub.conn, reason);
4766 }
4767
4775}
4776
4777/** Start the trunk running
4778 *
4779 */
4781{
4782 uint16_t i;
4783
4784 if (unlikely(trunk->started)) return 0;
4785
4786 /*
4787 * Spawn the initial set of connections
4788 */
4789 for (i = 0; i < trunk->conf.start; i++) {
4790 DEBUG("[%i] Starting initial connection", i);
4791 if (trunk_connection_spawn(trunk, fr_time()) != 0) return -1;
4792 }
4793
4794 /*
4795 * If the idle timeout is set, AND there's no management interval, OR the management interval is
4796 * less than the idle timeout, update the management interval.
4797 */
4801 trunk->conf.manage_interval = trunk->conf.idle_timeout;
4802 }
4803
4805 /*
4806 * Insert the event timer to manage
4807 * the interval between managing connections.
4808 */
4809 if (fr_timer_in(trunk, trunk->el->tl, &trunk->manage_ev, trunk->conf.manage_interval,
4810 false, _trunk_timer, trunk) < 0) {
4811 PERROR("Failed inserting trunk management event");
4812 return -1;
4813 }
4814 }
4815 trunk->started = true;
4816 trunk->managing_connections = true;
4817
4818 return 0;
4819}
4820
4821/** Allow the trunk to open and close connections in response to load
4822 *
4823 */
4825{
4826 if (!trunk->started || trunk->managing_connections) return;
4827
4828 DEBUG3("Connection management enabled");
4829 trunk->managing_connections = true;
4830}
4831
4832/** Stop the trunk from opening and closing connections in response to load
4833 *
4834 */
4836{
4837 if (!trunk->started || !trunk->managing_connections) return;
4838
4839 DEBUG3("Connection management disabled");
4840 trunk->managing_connections = false;
4841}
4842
4843/** Schedule a trunk management event for the next time the event loop is executed
4844 */
4846{
4847 if (!trunk->started || !trunk->managing_connections) return 0;
4848
4849 if (fr_timer_in(trunk, trunk->el->tl, &trunk->manage_ev, fr_time_delta_wrap(0),
4850 false, _trunk_timer, trunk) < 0) {
4851 PERROR("Failed inserting trunk management event");
4852 return -1;
4853 }
4854
4855 return 0;
4856}
4857
4858/** Order connections by queue depth
4859 *
4860 */
4861static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
4862{
4865
4868
4869 /*
4870 * Add a fudge factor of 1 to reduce spurious rebalancing
4871 */
4872 return ((a_count > b_count) && ((a_count - b_count) > 1)) - ((b_count > a_count) && ((b_count - a_count) > 1));
4873}
4874
4875/** Free a trunk, gracefully closing all connections.
4876 *
4877 */
4878static int _trunk_free(trunk_t *trunk)
4879{
4880 trunk_connection_t *tconn;
4881 trunk_request_t *treq;
4882 trunk_watch_entry_t *watch;
4883 size_t i;
4884
4885 DEBUG4("Trunk free %p", trunk);
4886
4887 trunk->freeing = true; /* Prevent re-enqueuing */
4888
4889 /*
4890 * We really don't want this firing after
4891 * we've freed everything.
4892 */
4894
4895 /*
4896 * Now free the connections in each of the lists.
4897 *
4898 * Each time a connection is freed it removes itself from the list
4899 * its in, which means the head should keep advancing automatically.
4900 */
4901 while ((tconn = fr_minmax_heap_min_peek(trunk->active))) connection_signal_halt(tconn->pub.conn);
4902 while ((tconn = fr_dlist_head(&trunk->init))) connection_signal_halt(tconn->pub.conn);
4903 while ((tconn = fr_dlist_head(&trunk->connecting))) connection_signal_halt(tconn->pub.conn);
4904 while ((tconn = fr_dlist_head(&trunk->full))) connection_signal_halt(tconn->pub.conn);
4905 while ((tconn = fr_dlist_head(&trunk->inactive))) connection_signal_halt(tconn->pub.conn);
4906 while ((tconn = fr_dlist_head(&trunk->inactive_draining))) connection_signal_halt(tconn->pub.conn);
4907 while ((tconn = fr_dlist_head(&trunk->closed))) connection_signal_halt(tconn->pub.conn);
4908 while ((tconn = fr_dlist_head(&trunk->draining))) connection_signal_halt(tconn->pub.conn);
4909 while ((tconn = fr_dlist_head(&trunk->draining_to_free))) connection_signal_halt(tconn->pub.conn);
4910
4911 /*
4912 * Process any deferred connection frees
4913 */
4914 while ((tconn = fr_dlist_head(&trunk->to_free))) talloc_free(fr_dlist_remove(&trunk->to_free, tconn));
4915
4916 /*
4917 * Free any requests left in the backlog
4918 */
4919 while ((treq = fr_heap_peek(trunk->backlog))) trunk_request_enter_failed(treq);
4920
4921 /*
4922 * Free any requests in our request cache
4923 */
4924 while ((treq = fr_dlist_head(&trunk->free_requests))) talloc_free(treq);
4925
4926 /*
4927 * Free any entries in the watch lists
4928 */
4929 for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
4930 while ((watch = fr_dlist_pop_head(&trunk->watch[i]))) talloc_free(watch);
4931 }
4932
4933 return 0;
4934}
4935
4936/** Allocate a new collection of connections
4937 *
4938 * This function should be called first to allocate a new trunk connection.
4939 *
4940 * After the trunk has been allocated, #trunk_request_alloc and
4941 * #trunk_request_enqueue should be used to allocate memory for trunk
4942 * requests, and pass a preq (protocol request) to the trunk for
4943 * processing.
4944 *
4945 * The trunk will then asynchronously process the request, writing the result
4946 * to a specified rctx. See #trunk_request_enqueue for more details.
4947 *
4948 * @note Trunks may not be shared between multiple threads under any circumstances.
4949 *
4950 * @param[in] ctx To use for any memory allocations. Must be thread local.
4951 * @param[in] el to use for I/O and timer events.
4952 * @param[in] funcs Callback functions.
4953 * @param[in] conf Common user configurable parameters.
4954 * @param[in] log_prefix To prepend to global messages.
4955 * @param[in] uctx User data to pass to the alloc function.
4956 * @param[in] delay_start If true, then we will not spawn any connections
4957 * until the first request is enqueued.
4958 * @param[in] trigger_args Pairs to pass to trigger requests, if triggers are enabled.
4959 * @return
4960 * - New trunk handle on success.
4961 * - NULL on error.
4962 */
4964 trunk_io_funcs_t const *funcs, trunk_conf_t const *conf,
4965 char const *log_prefix, void const *uctx, bool delay_start, fr_pair_list_t *trigger_args)
4966{
4967 trunk_t *trunk;
4968 size_t i;
4969
4970 /*
4971 * Check we have the functions we need
4972 */
4973 if (!fr_cond_assert(funcs->connection_alloc)) return NULL;
4974
4975 MEM(trunk = talloc_zero(ctx, trunk_t));
4976 trunk->el = el;
4977 trunk->log_prefix = talloc_strdup(trunk, log_prefix);
4978 trunk->trigger_args = trigger_args;
4979
4980 memcpy(&trunk->funcs, funcs, sizeof(trunk->funcs));
4981 if (!trunk->funcs.connection_prioritise) {
4983 }
4985
4986 memcpy(&trunk->conf, conf, sizeof(trunk->conf));
4987
4988 memcpy(&trunk->uctx, &uctx, sizeof(trunk->uctx));
4989 talloc_set_destructor(trunk, _trunk_free);
4990
4991 /*
4992 * Unused request list...
4993 */
4995
4996 /*
4997 * Request backlog queue
4998 */
5000 trunk_request_t, heap_id, 0));
5001
5002 /*
5003 * Connection queues and trees
5004 */
5006 trunk_connection_t, heap_id, 0));
5016
5017 /*
5018 * Watch lists
5019 */
5020 for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
5022 }
5023
5024 DEBUG4("Trunk allocated %p", trunk);
5025
5026 if (!delay_start) {
5027 if (trunk_start(trunk) < 0) {
5028 talloc_free(trunk);
5029 return NULL;
5030 }
5031 }
5032
5033 return trunk;
5034}
5035
5036/** Check for a module trigger section when parsing the `triggers` option.
5037 *
5038 */
5039int trunk_trigger_cf_parse(TALLOC_CTX *ctx, void *out, void *parent, CONF_ITEM *ci, conf_parser_t const *rule)
5040{
5043
5044 if (cf_pair_parse_value(ctx, out, parent, ci, rule)< 0) return -1;
5045
5046 /*
5047 * If the parent section of the `triggers` option contains a trigger
5048 * section then store it as the module CONF SECTION for the appropriate
5049 * trigger group.
5050 */
5051 if (cf_section_find(cs, "trigger", NULL)) {
5052 if (strcmp(cf_section_name(cs), "request") == 0) {
5053 conf->req_trigger_cs = cs;
5054 } else {
5055 conf->conn_trigger_cs = cs;
5056 }
5057 }
5058
5059 return 0;
5060}
5061
5062#ifndef TALLOC_GET_TYPE_ABORT_NOOP
5063/** Verify a trunk
5064 *
5065 * A trunk has some number of connections, which each have some number of requests. The connections and
5066 * requests are in differing kinds of containers depending on their state and how they are used, and may
5067 * have fields that can only be validated by comparison with a parent. We had planned on passing a "context"
5068 * down with the ancestral values, but that breaks the foo_verify() API. Each foo_verify() will only verify the
5069 * foo's children.
5070 */
5071void trunk_verify(char const *file, int line, trunk_t *trunk)
5072{
5073 fr_fatal_assert_msg(trunk, "CONSISTENCY CHECK FAILED %s[%i]: trunk_t pointer was NULL", file, line);
5074 (void) talloc_get_type_abort(trunk, trunk_t);
5075
5076 for (size_t i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
5077 _fr_dlist_verify(file, line, &trunk->watch[i]);
5078 }
5079
5080#define IO_FUNC_VERIFY(_func) \
5081 fr_fatal_assert_msg(trunk->funcs._func, "CONSISTENCY_CHECK_FAILED %s[%i}: " #_func " was NULL", file, line)
5082
5083 /*
5084 * Only a few of the function pointers *must* be non-NULL..
5085 */
5087 IO_FUNC_VERIFY(connection_prioritise);
5089
5090#define TRUNK_TCONN_CHECKS(_tconn, _state) \
5091do { \
5092 fr_fatal_assert_msg(trunk == _tconn->pub.trunk, \
5093 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-trunk mismatch", file, line); \
5094 fr_fatal_assert_msg(_state == _tconn->pub.state, \
5095 "CONSISTENCY_CHECK_FAILED %s[%i}: connection-state mismatch", file, line); \
5096} while (0)
5097
5098#define TCONN_DLIST_VERIFY(_dlist, _state) \
5099do { \
5100 _fr_dlist_verify(file, line, &(trunk->_dlist)); \
5101 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5102 trunk_connection_verify(file, line, tconn); \
5103 TRUNK_TCONN_CHECKS(tconn, _state); \
5104 } \
5105} while (0)
5106
5107#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state) \
5108do {\
5109 fr_minmax_heap_verify(file, line, trunk->_heap); \
5110 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5111 trunk_connection_verify(file, line, tconn); \
5112 TRUNK_TCONN_CHECKS(tconn, _state); \
5113 }} \
5114} while (0)
5115
5116 fr_dlist_verify(&(trunk->free_requests));
5117 FR_HEAP_VERIFY(trunk->backlog);
5118
5125 /* TCONN_DLIST_VERIFY(failed, ???); */
5130}
5131
5133{
5134 fr_fatal_assert_msg(tconn, "CONSISTENCY CHECK FAILED %s[%i]: trunk_connection_t pointer was NULL", file, line);
5135 (void) talloc_get_type_abort(tconn, trunk_connection_t);
5136
5137 (void) talloc_get_type_abort(tconn->pub.trunk, trunk_t);
5138
5139 /*
5140 * shouldn't be both in heap and on list--but it doesn't look like moves
5141 * to active heap wipe the dlist pointers.
5142 */
5143
5144#define TCONN_TREQ_CHECKS(_treq, _state) \
5145do { \
5146 fr_fatal_assert_msg(tconn == _treq->pub.tconn, \
5147 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-tconn mismatch", file, line); \
5148 fr_fatal_assert_msg(tconn->pub.trunk == _treq->pub.trunk, \
5149 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-trunk mismatch", file, line); \
5150 fr_fatal_assert_msg(_state == _treq->pub.state, \
5151 "CONSISTENCY_CHECK_FAILED %s[%i}: trunk request-state mismatch", file, line); \
5152} while (0)
5153
5154#define TREQ_DLIST_VERIFY(_dlist, _state) \
5155do { \
5156 _fr_dlist_verify(file, line, &(tconn->_dlist)); \
5157 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5158 trunk_request_verify(file, line, treq); \
5159 TCONN_TREQ_CHECKS(treq, _state); \
5160 } \
5161} while (0)
5162
5163#define TREQ_HEAP_VERIFY(_heap, _state) \
5164do { \
5165 fr_heap_iter_t _iter; \
5166 fr_heap_verify(file, line, tconn->_heap); \
5167 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5168 treq; \
5169 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5170 trunk_request_verify(file, line, treq); \
5171 TCONN_TREQ_CHECKS(treq, _state); \
5172 } \
5173} while (0)
5174
5175#define TREQ_OPTION_VERIFY(_option, _state) \
5176do { \
5177 if (tconn->_option) { \
5178 trunk_request_verify(file, line, tconn->_option); \
5179 TCONN_TREQ_CHECKS(tconn->_option, _state); \
5180 } \
5181} while (0)
5182
5183 /* verify associated requests */
5190}
5191
5192void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
5193{
5194 fr_fatal_assert_msg(treq, "CONSISTENCY CHECK FAILED %s[%i]: trunk_request_t pointer was NULL", file, line);
5195 (void) talloc_get_type_abort(treq, trunk_request_t);
5196
5197#ifdef WITH_VERIFY_PTR
5198 if (treq->pub.request) request_verify(file, line, treq->pub.request);
5199#endif
5200}
5201
5202
5203bool trunk_search(trunk_t *trunk, void *ptr)
5204{
5205#define TCONN_DLIST_SEARCH(_dlist) \
5206do { \
5207 fr_dlist_foreach(&(trunk->_dlist), trunk_connection_t, tconn) { \
5208 if (ptr == tconn) { \
5209 fr_fprintf(stderr, "trunk_search: tconn %p on " #_dlist "\n", ptr); \
5210 return true; \
5211 } \
5212 if (trunk_connection_search(tconn, ptr)) { \
5213 fr_fprintf(stderr, " in tconn %p on " #_dlist "\n", tconn); \
5214 return true; \
5215 } \
5216 } \
5217} while (0)
5218
5219#define TCONN_MINMAX_HEAP_SEARCH(_heap) \
5220do { \
5221 fr_minmax_heap_foreach(trunk->_heap, trunk_connection_t, tconn) { \
5222 if (ptr == tconn) { \
5223 fr_fprintf(stderr, "trunk_search: tconn %p on " #_heap "\n", ptr); \
5224 return true; \
5225 } \
5226 if (trunk_connection_search(tconn, ptr)) { \
5227 fr_fprintf(stderr, " on tconn %p on " #_heap "\n", tconn); \
5228 return true; \
5229 } \
5230 }}\
5231} while (0)
5232
5234 TCONN_DLIST_SEARCH(connecting);
5236 TCONN_DLIST_SEARCH(full);
5237 TCONN_DLIST_SEARCH(inactive);
5238 TCONN_DLIST_SEARCH(inactive_draining);
5239 TCONN_DLIST_SEARCH(failed);
5240 TCONN_DLIST_SEARCH(closed);
5241 TCONN_DLIST_SEARCH(draining);
5242 TCONN_DLIST_SEARCH(draining_to_free);
5243 TCONN_DLIST_SEARCH(to_free);
5244
5245 return false;
5246}
5247
5249{
5250#define TREQ_DLIST_SEARCH(_dlist) \
5251do { \
5252 fr_dlist_foreach(&(tconn->_dlist), trunk_request_t, treq) { \
5253 if (ptr == treq) { \
5254 fr_fprintf(stderr, "trunk_search: treq %p on " #_dlist "\n", ptr); \
5255 return true; \
5256 } \
5257 if (trunk_request_search(treq, ptr)) { \
5258 fr_fprintf(stderr, "trunk_search: preq %p found on " #_dlist, ptr); \
5259 return true; \
5260 } \
5261 } \
5262} while (0)
5263
5264#define TREQ_HEAP_SEARCH(_heap) \
5265do { \
5266 fr_heap_iter_t _iter; \
5267 for (trunk_request_t *treq = fr_heap_iter_init(tconn->_heap, &_iter); \
5268 treq; \
5269 treq = fr_heap_iter_next(tconn->_heap, &_iter)) { \
5270 if (ptr == treq) { \
5271 fr_fprintf(stderr, "trunk_search: treq %p in " #_heap "\n", ptr); \
5272 return true; \
5273 } \
5274 if (trunk_request_search(treq, ptr)) { \
5275 fr_fprintf(stderr, "trunk_search: preq %p found in " #_heap, ptr); \
5276 return true; \
5277 } \
5278 } \
5279} while (0)
5280
5281#define TREQ_OPTION_SEARCH(_option) \
5282do { \
5283 if (tconn->_option) { \
5284 if (ptr == tconn->_option) { \
5285 fr_fprintf(stderr, "trunk_search: treq %p is " #_option "\n", ptr); \
5286 return true; \
5287 } \
5288 if (trunk_request_search(tconn->_option, ptr)) { \
5289 fr_fprintf(stderr, "trunk_search: preq %p found in " #_option, ptr); \
5290 return true; \
5291 } \
5292 } \
5293} while (0)
5294
5295 /* search associated requests */
5296 TREQ_HEAP_SEARCH(pending);
5297 TREQ_DLIST_SEARCH(sent);
5298 TREQ_DLIST_SEARCH(cancel);
5299 TREQ_DLIST_SEARCH(cancel_sent);
5300 TREQ_OPTION_SEARCH(partial);
5301 TREQ_OPTION_SEARCH(cancel_partial);
5302
5303 return false;
5304}
5305
5307{
5308 return treq->pub.preq == ptr;
5309}
5310#endif
int const char * file
Definition acutest.h:702
int const char int line
Definition acutest.h:702
void request_verify(UNUSED char const *file, UNUSED int line, UNUSED request_t *request)
static bool init
Definition fuzzer.c:41
#define L(_str)
Helper for initialising arrays of string literals.
Definition build.h:209
#define NDEBUG_UNUSED
Definition build.h:328
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition build.h:324
#define unlikely(_x)
Definition build.h:383
#define UNUSED
Definition build.h:317
#define NUM_ELEMENTS(_t)
Definition build.h:339
int cf_pair_parse_value(TALLOC_CTX *ctx, void *out, UNUSED void *base, CONF_ITEM *ci, conf_parser_t const *rule)
Parses a CONF_PAIR into a C data type.
Definition cf_parse.c:214
#define CONF_PARSER_TERMINATOR
Definition cf_parse.h:660
cf_parse_t func
Override default parsing behaviour for the specified type with a custom parsing function.
Definition cf_parse.h:614
#define FR_CONF_OFFSET(_name, _struct, _field)
conf_parser_t which parses a single CONF_PAIR, writing the result to a field in a struct
Definition cf_parse.h:283
#define FR_CONF_POINTER(_name, _type, _flags, _res_p)
conf_parser_t which parses a single CONF_PAIR producing a single global result
Definition cf_parse.h:337
#define FR_CONF_OFFSET_SUBSECTION(_name, _flags, _struct, _field, _subcs)
conf_parser_t which populates a sub-struct using a CONF_SECTION
Definition cf_parse.h:312
@ CONF_FLAG_SUBSECTION
Instead of putting the information into a configuration structure, the configuration file routines MA...
Definition cf_parse.h:426
Defines a CONF_PAIR to C data type mapping.
Definition cf_parse.h:597
Common header for all CONF_* types.
Definition cf_priv.h:49
Configuration AVP similar to a fr_pair_t.
Definition cf_priv.h:70
A section grouping multiple CONF_PAIR.
Definition cf_priv.h:101
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
Definition cf_util.c:1027
CONF_SECTION * cf_item_to_section(CONF_ITEM const *ci)
Cast a CONF_ITEM to a CONF_SECTION.
Definition cf_util.c:683
char const * cf_section_name(CONF_SECTION const *cs)
Return name2 if set, else name1.
Definition cf_util.c:1196
#define cf_parent(_cf)
Definition cf_util.h:101
connection_state_t
Definition connection.h:47
@ CONNECTION_STATE_FAILED
Connection has failed.
Definition connection.h:56
@ CONNECTION_STATE_HALTED
The connection is in a halted stat.
Definition connection.h:48
@ CONNECTION_STATE_CLOSED
Connection has been closed.
Definition connection.h:57
@ CONNECTION_STATE_CONNECTED
File descriptor is open (ready for writing).
Definition connection.h:54
@ CONNECTION_STATE_INIT
Init state, sets up connection.
Definition connection.h:51
@ CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
Definition connection.h:52
@ CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
Definition connection.h:55
connection_reason_t
Definition connection.h:84
static size_t min(size_t x, size_t y)
Definition dbuff.c:66
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:131
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:202
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:148
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:176
#define MEM(x)
Definition debug.h:36
#define DEBUG(fmt,...)
Definition dhcpclient.c:39
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:260
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition dlist.h:486
static void _fr_dlist_verify(char const *file, int line, fr_dlist_head_t const *list_head)
Check all items in the list are valid.
Definition dlist.h:735
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition dlist.h:638
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition dlist.h:163
static void * fr_dlist_prev(fr_dlist_head_t const *list_head, void const *ptr)
Get the previous item in a list.
Definition dlist.h:588
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:939
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition dlist.h:672
static void * fr_dlist_tail(fr_dlist_head_t const *list_head)
Return the TAIL item of a list or NULL if the list is empty.
Definition dlist.h:531
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:378
#define fr_dlist_verify(_head)
Definition dlist.h:755
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:275
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
Definition dlist.h:338
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Definition dlist.h:555
Head of a doubly linked list.
Definition dlist.h:51
Entry in a doubly linked list.
Definition dlist.h:41
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
unsigned int fr_heap_index_t
Definition heap.h:80
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition heap.h:136
#define FR_HEAP_VERIFY(_heap)
Definition heap.h:212
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition heap.h:115
The main heap structure.
Definition heap.h:66
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition log.h:528
#define RDEBUG3(fmt,...)
Definition log.h:343
#define RWARN(fmt,...)
Definition log.h:297
#define DEBUG4(_fmt,...)
Definition log.h:267
#define RATE_LIMIT_LOCAL_ROPTIONAL(_entry, _l_request, _l_global, _fmt,...)
Rate limit messages using a local limiting entry.
Definition log.h:606
Track when a log message was last repeated.
Definition log.h:547
talloc_free(reap)
Stores all information relating to an event list.
Definition event.c:377
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition log.c:577
fr_log_type_t
Definition log.h:54
#define ROUND_UP_DIV(_x, _y)
Get the ceiling value of integer division.
Definition math.h:153
unsigned short uint16_t
unsigned int uint32_t
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
void * fr_minmax_heap_iter_next(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Get the next entry in a minmax heap.
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
void * fr_minmax_heap_iter_init(fr_minmax_heap_t *hp, fr_minmax_heap_iter_t *iter)
Iterate over entries in a minmax heap.
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
unsigned int fr_minmax_heap_iter_t
Definition minmax_heap.h:38
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
Definition minmax_heap.h:85
int8_t fr_pointer_cmp(void const *a, void const *b)
Compares two pointers.
Definition misc.c:418
static int8_t request_prioritise(void const *one, void const *two)
Definition bio.c:1153
#define fr_assert(_expr)
Definition rad_assert.h:38
static bool done
Definition radclient.c:81
#define RDEBUG(fmt,...)
Definition radclient.h:53
#define DEBUG2(fmt,...)
Definition radclient.h:43
#define WARN(fmt,...)
Definition radclient.h:47
#define INFO(fmt,...)
Definition radict.c:54
static fr_event_list_t * events
Definition radsniff.c:59
static rs_t * conf
Definition radsniff.c:53
void connection_signal_shutdown(connection_t *conn)
Shuts down a connection gracefully.
int connection_del_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a post list.
Definition connection.c:474
void connection_signal_halt(connection_t *conn)
Shuts down a connection ungracefully.
void connection_signals_resume(connection_t *conn)
Resume processing of deferred signals.
Definition connection.c:321
void connection_signal_reconnect(connection_t *conn, connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
void connection_signal_init(connection_t *conn)
Asynchronously signal a halted connection to start.
connection_t * connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_funcs_t const *funcs, connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
connection_watch_entry_t * connection_add_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed before a state function has been called.
Definition connection.c:512
connection_watch_entry_t * connection_add_watch_post(connection_t *conn, connection_state_t state, connection_watch_t watch, bool oneshot, void const *uctx)
Add a callback to be executed after a state function has been called.
Definition connection.c:534
int connection_del_watch_pre(connection_t *conn, connection_state_t state, connection_watch_t watch)
Remove a watch function from a pre list.
Definition connection.c:457
void connection_signals_pause(connection_t *conn)
Pause processing of deferred signals.
Definition connection.c:312
static fr_time_t test_time_base
Definition slab_tests.c:42
return count
Definition module.c:155
fr_time_t test_time
Definition state_test.c:3
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
@ memory_order_relaxed
Definition stdatomic.h:127
#define atomic_fetch_add_explicit(object, operand, order)
Definition stdatomic.h:302
#define ATOMIC_VAR_INIT(value)
Definition stdatomic.h:88
Definition log.h:96
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
An element in a table indexed by bit position.
Definition table.h:83
An element in an arbitrarily ordered array of name to num mappings.
Definition table.h:57
#define talloc_get_type_abort_const
Definition talloc.h:287
#define talloc_pooled_object(_ctx, _type, _num_subobjects, _total_subobjects_size)
Definition talloc.h:180
#define fr_time_gteq(_a, _b)
Definition time.h:238
#define fr_time_delta_wrap(_time)
Definition time.h:152
#define fr_time_wrap(_time)
Definition time.h:145
#define fr_time_lteq(_a, _b)
Definition time.h:240
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_gt(_a, _b)
Definition time.h:237
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
#define fr_time_lt(_a, _b)
Definition time.h:239
#define fr_time_delta_gt(_a, _b)
Definition time.h:283
"server local" time.
Definition time.h:69
An event timer list.
Definition timer.c:50
A timer event.
Definition timer.c:84
#define FR_TIMER_DELETE(_ev_p)
Definition timer.h:103
#define FR_TIMER_DELETE_RETURN(_ev_p)
Definition timer.h:110
#define fr_timer_in(...)
Definition timer.h:87
#define FR_TIMER_DISARM(_ev)
Definition timer.h:91
bool trunk_search(trunk_t *trunk, void *ptr)
Definition trunk.c:5203
static atomic_uint_fast64_t request_counter
Definition trunk.c:54
CONF_PAIR * trigger_cp[NUM_ELEMENTS(trunk_conn_trigger_names)]
Cached trigger CONF_PAIRs.
Definition trunk.c:319
static void trunk_connection_enter_active(trunk_connection_t *tconn)
Transition a connection back to the active state.
Definition trunk.c:3264
#define CONN_REORDER(_tconn)
Reorder the connections in the active heap.
Definition trunk.c:790
static size_t trunk_req_trigger_names_len
Definition trunk.c:386
int trunk_connection_pop_cancellation(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a cancellation request off a connection's cancellation queue.
Definition trunk.c:3851
fr_dlist_head_t cancel
Requests in the cancel state.
Definition trunk.c:161
int trunk_connection_manage_schedule(trunk_t *trunk)
Schedule a trunk management event for the next time the event loop is executed.
Definition trunk.c:4845
#define REQUEST_EXTRACT_SENT(_treq)
Remove the current request from the sent list.
Definition trunk.c:760
static void _trunk_connection_on_shutdown(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the shutdown state.
Definition trunk.c:3414
struct trunk_watch_entry_s trunk_watch_entry_t
An entry in a trunk watch function list.
fr_dlist_head_t reapable
Idle request.
Definition trunk.c:159
fr_heap_t * pending
Requests waiting to be sent.
Definition trunk.c:153
trunk_conf_t conf
Trunk common configuration.
Definition trunk.c:224
static size_t trunk_connection_states_len
Definition trunk.c:425
#define REQUEST_EXTRACT_REAPABLE(_treq)
Remove the current request from the reapable list.
Definition trunk.c:765
trunk_connection_t * tconn
The request was associated with.
Definition trunk.c:82
void trunk_connection_callback_readable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O read function.
Definition trunk.c:4030
fr_rate_limit_t limit_last_failure_log
Rate limit on "Refusing to enqueue requests - No active conns".
Definition trunk.c:298
void trunk_verify(char const *file, int line, trunk_t *trunk)
Verify a trunk.
Definition trunk.c:5071
fr_timer_t * manage_ev
Periodic connection management event.
Definition trunk.c:290
#define IN_HANDLER(_trunk)
Definition trunk.c:720
static fr_table_num_ordered_t const trunk_connection_states[]
Definition trunk.c:413
void trunk_reconnect(trunk_t *trunk, int states, connection_reason_t reason)
Force the trunk to re-establish its connections.
Definition trunk.c:4741
void trunk_connection_callback_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
Standard I/O write function.
Definition trunk.c:4047
void * uctx
User data to pass to the function.
Definition trunk.c:191
static void trunk_request_enter_pending(trunk_request_t *treq, trunk_connection_t *tconn, bool new)
Transition a request to the pending state, adding it to the backlog of an active connection.
Definition trunk.c:1168
static void trunk_request_remove_from_conn(trunk_request_t *treq)
Remove a request from all connection lists.
Definition trunk.c:977
fr_rate_limit_t limit_max_requests_alloc_log
Rate limit on "Refusing to alloc requests - Limit of * requests reached".
Definition trunk.c:296
trunk_request_state_t to
What state we transitioned to.
Definition trunk.c:80
static int8_t _trunk_request_prioritise(void const *a, void const *b)
Compare two protocol requests.
Definition trunk.c:953
static void trunk_manage(trunk_t *trunk, fr_time_t now)
Implements the algorithm we use to manage requests per connection levels.
Definition trunk.c:4179
static int _trunk_connection_free(trunk_connection_t *tconn)
Free a connection.
Definition trunk.c:3696
trunk_io_funcs_t funcs
I/O functions.
Definition trunk.c:276
fr_dlist_head_t draining
Connections that will be freed once all their requests are complete, but can be reactivated.
Definition trunk.c:261
#define REQUEST_EXTRACT_CANCEL_PARTIAL(_treq)
Remove the current request from the cancel_partial slot.
Definition trunk.c:775
int trunk_trigger_cf_parse(TALLOC_CTX *ctx, void *out, void *parent, CONF_ITEM *ci, conf_parser_t const *rule)
Check for a module trigger section when parsing the triggers option.
Definition trunk.c:5039
int trunk_start(trunk_t *trunk)
Start the trunk running.
Definition trunk.c:4780
void trunk_request_signal_partial(trunk_request_t *treq)
Signal a partial write.
Definition trunk.c:2045
void trunk_request_signal_fail(trunk_request_t *treq)
Signal that a trunk request failed.
Definition trunk.c:2148
#define TREQ_OPTION_SEARCH(_option)
void trunk_request_signal_cancel_sent(trunk_request_t *treq)
Signal that a remote server has been notified of the cancellation.
Definition trunk.c:2276
static void trunk_connection_enter_draining_to_free(trunk_connection_t *tconn)
Transition a connection to the draining-to-reconnect state.
Definition trunk.c:3228
trunk_watch_t func
Function to call when a trunk enters.
Definition trunk.c:187
void trunk_connection_signal_readable(trunk_connection_t *tconn)
Signal that a trunk connection is readable.
Definition trunk.c:3937
#define DO_REQUEST_FREE(_treq)
Call the free callback (if set)
Definition trunk.c:612
trunk_request_t * trunk_request_alloc(trunk_t *trunk, request_t *request)
(Pre-)Allocate a new trunk request
Definition trunk.c:2490
static void _trunk_connection_on_halted(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the halted state.
Definition trunk.c:3648
#define REQUEST_EXTRACT_BACKLOG(_treq)
Remove the current request from the backlog.
Definition trunk.c:731
fr_heap_index_t heap_id
Used to track the connection in the connected heap.
Definition trunk.c:138
fr_dlist_head_t closed
Connections that have closed.
Definition trunk.c:258
fr_dlist_head_t watch[TRUNK_STATE_MAX]
To be called when trunk changes state.
Definition trunk.c:282
static void trunk_watch_call(trunk_t *trunk, fr_dlist_head_t *list, trunk_state_t state)
Call a list of watch functions associated with a state.
Definition trunk.c:803
static void trunk_request_enter_cancel_complete(trunk_request_t *treq)
Cancellation was acked, the request is complete, free it.
Definition trunk.c:1520
int line
Line change occurred on.
Definition trunk.c:92
static void trunk_connection_enter_inactive_draining(trunk_connection_t *tconn)
Transition a connection to the inactive-draining state.
Definition trunk.c:3166
#define CONN_STATE_TRANSITION(_new, _log)
Definition trunk.c:455
static uint64_t trunk_requests_per_connection(uint16_t *conn_count_out, uint32_t *req_conn_out, trunk_t *trunk, fr_time_t now, NDEBUG_UNUSED bool verify)
Update timestamps for when we last had a transition from above target to below target or vice versa.
Definition trunk.c:4587
static size_t trunk_connection_events_len
Definition trunk.c:441
static void _trunk_connection_on_failed(connection_t *conn, connection_state_t prev, connection_state_t state, void *uctx)
Connection failed.
Definition trunk.c:3596
bool oneshot
Remove the function after it's called once.
Definition trunk.c:189
bool started
Has the trunk been started.
Definition trunk.c:307
static size_t trunk_states_len
Definition trunk.c:411
#define TCONN_DLIST_VERIFY(_dlist, _state)
#define IO_FUNC_VERIFY(_func)
uint32_t trunk_request_count_by_connection(trunk_connection_t const *tconn, int req_state)
Return the count number of requests associated with a trunk connection.
Definition trunk.c:2895
uint64_t last_req_per_conn
The last request to connection ratio we calculated.
Definition trunk.c:312
#define DO_REQUEST_COMPLETE(_treq)
Call the complete callback (if set)
Definition trunk.c:573
static void trunk_connection_auto_full(trunk_connection_t *tconn)
Automatically mark a connection as full.
Definition trunk.c:2914
static void trunk_connection_remove(trunk_connection_t *tconn)
Remove a trunk connection from whichever list it's currently in.
Definition trunk.c:3065
#define TRUNK_REQUEST_STATE_LOG_MAX
The maximum number of state logs to record per request.
Definition trunk.c:71
static void trunk_connection_writable(trunk_connection_t *tconn)
A connection is writable.
Definition trunk.c:2978
#define OVER_MAX_CHECK
trunk_connection_event_t events
The current events we expect to be notified on.
Definition trunk.c:147
trunk_watch_entry_t * trunk_add_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch, bool oneshot, void const *uctx)
Add a watch entry to the trunk state list.
Definition trunk.c:879
static int _trunk_free(trunk_t *trunk)
Free a trunk, gracefully closing all connections.
Definition trunk.c:4878
fr_dlist_head_t failed
Connections that'll be reconnected shortly.
Definition trunk.c:256
static void trunk_rebalance(trunk_t *trunk)
Rebalance connections across active trunk members when a new connection becomes active.
Definition trunk.c:4115
static void trunk_backlog_drain(trunk_t *trunk)
Drain the backlog of as many requests as possible.
Definition trunk.c:4686
#define DO_REQUEST_CANCEL(_treq, _reason)
Call the cancel callback if set.
Definition trunk.c:534
static int8_t _trunk_connection_order_by_shortest_queue(void const *one, void const *two)
Order connections by queue depth.
Definition trunk.c:4861
struct trunk_request_pub_s pub
Public fields in the trunk request.
Definition trunk.c:100
#define TCONN_MINMAX_HEAP_VERIFY(_heap, _state)
trunk_request_t * cancel_partial
Partially written cancellation request.
Definition trunk.c:163
#define TCONN_MINMAX_HEAP_SEARCH(_heap)
uint64_t trunk_connection_requests_requeue(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Move requests off of a connection and requeue elsewhere.
Definition trunk.c:2026
bool enabled
Whether the watch entry is enabled.
Definition trunk.c:190
fr_time_t last_freed
Last time this request was freed.
Definition trunk.c:113
#define DO_REQUEST_CONN_RELEASE(_treq)
Call the "conn_release" callback (if set)
Definition trunk.c:555
#define TREQ_DLIST_SEARCH(_dlist)
#define REQUEST_EXTRACT_CANCEL(_treq)
Remove the current request from the cancel list.
Definition trunk.c:770
static bool trunk_connection_is_full(trunk_connection_t *tconn)
Return whether a trunk connection should currently be considered full.
Definition trunk.c:2937
struct trunk_pub_s pub
Public fields in the trunk connection.
Definition trunk.c:216
trunk_cancel_reason_t cancel_reason
Why this request was cancelled.
Definition trunk.c:111
#define REQUEST_BAD_STATE_TRANSITION(_new)
Definition trunk.c:500
trunk_enqueue_t trunk_request_enqueue_on_conn(trunk_request_t **treq_out, trunk_connection_t *tconn, request_t *request, void *preq, void *rctx, bool ignore_limits)
Enqueue additional requests on a specific connection.
Definition trunk.c:2757
static void _trunk_connection_on_closed(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection failed after it was connected.
Definition trunk.c:3525
static fr_table_num_ordered_t const trunk_connection_events[]
Definition trunk.c:435
trunk_enqueue_t trunk_request_enqueue(trunk_request_t **treq_out, trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
Definition trunk.c:2603
#define TCONN_DLIST_SEARCH(_dlist)
static void trunk_request_enter_unassigned(trunk_request_t *treq)
Transition a request to the unassigned state, in preparation for re-assignment.
Definition trunk.c:1071
struct trunk_request_s trunk_request_t
Definition trunk.c:33
void * in_handler
Which handler we're inside.
Definition trunk.c:278
bool freeing
Trunk is being freed, don't spawn new connections or re-enqueue.
Definition trunk.c:304
static fr_table_num_ordered_t const trunk_states[]
Definition trunk.c:406
static void trunk_connection_readable(trunk_connection_t *tconn)
A connection is readable.
Definition trunk.c:2968
#define IS_SERVICEABLE(_tconn)
Definition trunk.c:725
trunk_enqueue_t trunk_request_requeue(trunk_request_t *treq)
Re-enqueue a request on the same connection.
Definition trunk.c:2692
#define IS_PROCESSING(_tconn)
Definition trunk.c:726
#define RECONNECT_BY_STATE(_state, _list)
static void trunk_connection_enter_draining(trunk_connection_t *tconn)
Transition a connection to the draining state.
Definition trunk.c:3196
static fr_table_num_indexed_bit_pos_t const trunk_req_trigger_names[]
Map request states to trigger names.
Definition trunk.c:371
fr_dlist_t entry
Used to track the trunk request in the conn->sent or trunk->backlog request.
Definition trunk.c:108
static void trunk_connection_close_if_empty(trunk_t *trunk, fr_dlist_head_t *head)
Close connections in a particular connection list if they have no requests associated with them.
Definition trunk.c:4073
void trunk_request_signal_cancel_complete(trunk_request_t *treq)
Signal that a remote server acked our cancellation.
Definition trunk.c:2300
static trunk_enqueue_t trunk_request_check_enqueue(trunk_connection_t **tconn_out, trunk_t *trunk, request_t *request)
Check to see if a trunk request can be enqueued.
Definition trunk.c:1618
#define DO_REQUEST_MUX(_tconn)
Write one or more requests to a connection.
Definition trunk.c:630
#define REQUEST_EXTRACT_PARTIAL(_treq)
Remove the current request from the partial slot.
Definition trunk.c:751
fr_dlist_head_t sent
Sent request.
Definition trunk.c:157
static void trunk_request_enter_partial(trunk_request_t *treq)
Transition a request to the partial state, indicating that is has been partially sent.
Definition trunk.c:1239
fr_timer_t * lifetime_ev
Maximum time this connection can be open.
Definition trunk.c:178
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
Definition trunk.c:3899
fr_dlist_head_t connecting
Connections which are not yet in the open state.
Definition trunk.c:242
#define TRUNK_STATE_TRANSITION(_new)
Definition trunk.c:899
void trunk_request_signal_cancel(trunk_request_t *treq)
Cancel a trunk request.
Definition trunk.c:2168
void trunk_request_state_log_entry_add(char const *function, int line, trunk_request_t *treq, trunk_request_state_t new)
Definition trunk.c:2813
static int trunk_connection_spawn(trunk_t *trunk, fr_time_t now)
Attempt to spawn a new connection.
Definition trunk.c:3764
int trunk_del_watch(trunk_t *trunk, trunk_state_t state, trunk_watch_t watch)
Remove a watch function from a trunk state list.
Definition trunk.c:845
static void _trunk_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx)
Event to periodically call the connection management function.
Definition trunk.c:4516
struct trunk_connection_pub_s pub
Public fields in the trunk connection.
Definition trunk.c:134
static void trunk_request_enter_reapable(trunk_request_t *treq)
Transition a request to the reapable state, indicating that it's been sent in its entirety,...
Definition trunk.c:1326
uint16_t trunk_connection_count_by_state(trunk_t *trunk, int conn_state)
Return the count number of connections in the specified states.
Definition trunk.c:2871
#define IN_REQUEST_DEMUX(_trunk)
Definition trunk.c:722
#define DO_REQUEST_FAIL(_treq, _prev_state)
Call the fail callback (if set)
Definition trunk.c:592
static void trunk_request_enter_cancel(trunk_request_t *treq, trunk_cancel_reason_t reason)
Transition a request to the cancel state, placing it in a connection's cancellation list.
Definition trunk.c:1389
static trunk_enqueue_t trunk_request_enqueue_existing(trunk_request_t *treq)
Enqueue a request which has never been assigned to a connection or was previously cancelled.
Definition trunk.c:1696
bool managing_connections
Whether the trunk is allowed to manage (open/close) connections.
Definition trunk.c:309
#define DO_CONNECTION_ALLOC(_tconn)
Allocate a new connection.
Definition trunk.c:681
char const * function
State change occurred in.
Definition trunk.c:91
static size_t trunk_request_states_len
Definition trunk.c:404
fr_dlist_head_t init
Connections which have not yet started connecting.
Definition trunk.c:239
fr_dlist_head_t * log_head
To allow the log entry to remove itself on free.
Definition trunk.c:77
static void trunk_request_enter_cancel_partial(trunk_request_t *treq)
Transition a request to the cancel_partial state, placing it in a connection's cancel_partial slot.
Definition trunk.c:1440
static void _trunk_connection_on_connected(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connected state.
Definition trunk.c:3466
trunk_t * trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, trunk_io_funcs_t const *funcs, trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start, fr_pair_list_t *trigger_args)
Allocate a new collection of connections.
Definition trunk.c:4963
fr_dlist_head_t to_free
Connections we're done with and will free on the next call to trunk_manage.
Definition trunk.c:267
trunk_request_t * partial
Partially written request.
Definition trunk.c:155
static void trunk_request_enter_failed(trunk_request_t *treq)
Request failed, inform the API client and free the request.
Definition trunk.c:1582
fr_minmax_heap_t * active
Connections which can service requests.
Definition trunk.c:244
conf_parser_t const trunk_config[]
Config parser definitions to populate a trunk_conf_t.
Definition trunk.c:341
static void trunk_request_enter_complete(trunk_request_t *treq)
Request completed successfully, inform the API client and free the request.
Definition trunk.c:1551
static void trunk_request_enter_sent(trunk_request_t *treq)
Transition a request to the sent state, indicating that it's been sent in its entirety.
Definition trunk.c:1269
#define DO_REQUEST_CANCEL_MUX(_tconn)
Write one or more cancellation requests to a connection.
Definition trunk.c:663
static void trunk_connection_enter_full(trunk_connection_t *tconn)
Transition a connection to the full state.
Definition trunk.c:3121
void trunk_request_free(trunk_request_t **treq_to_free)
If the trunk request is freed then update the target requests.
Definition trunk.c:2338
#define DO_REQUEST_DEMUX(_tconn)
Read one or more requests from a connection.
Definition trunk.c:647
static uint64_t trunk_connection_requests_dequeue(fr_dlist_head_t *out, trunk_connection_t *tconn, int states, uint64_t max)
Shift requests in the specified states onto new connections.
Definition trunk.c:1753
static int _trunk_request_free(trunk_request_t *treq)
Actually free the trunk request.
Definition trunk.c:2457
char const * log_prefix
What to prepend to messages.
Definition trunk.c:220
#define REQUEST_EXTRACT_PENDING(_treq)
Remove the current request from the pending list.
Definition trunk.c:741
static void _trunk_connection_lifetime_expire(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t now, void *uctx)
Trigger a reconnection of the trunk connection.
Definition trunk.c:3448
static void trunk_connection_event_update(trunk_connection_t *tconn)
Update the registrations for I/O events we're interested in.
Definition trunk.c:3001
static conf_parser_t const trunk_config_request[]
Definition trunk.c:324
fr_dlist_head_t full
Connections which have too many outstanding requests.
Definition trunk.c:246
#define DEQUEUE_ALL(_src_list, _state)
static void trunk_request_enter_backlog(trunk_request_t *treq, bool new)
Transition a request to the backlog state, adding it to the backlog of the trunk.
Definition trunk.c:1106
static fr_table_num_ordered_t const trunk_request_states[]
Definition trunk.c:389
static void _trunk_connection_on_connecting(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the connecting state.
Definition trunk.c:3370
static fr_table_num_indexed_bit_pos_t const trunk_conn_trigger_names[]
Map connection states to trigger names.
Definition trunk.c:198
fr_dlist_head_t draining_to_free
Connections that will be freed once all their requests are complete.
Definition trunk.c:264
uint64_t id
Trunk request ID.
Definition trunk.c:104
uint64_t sent_count
The number of requests that have been sent using this connection.
Definition trunk.c:171
static void _trunk_connection_on_init(UNUSED connection_t *conn, UNUSED connection_state_t prev, UNUSED connection_state_t state, void *uctx)
Connection transitioned to the the init state.
Definition trunk.c:3335
#define DO_CONNECTION_NOTIFY(_tconn, _events)
Change what events the connection should be notified about.
Definition trunk.c:703
#define TREQ_DLIST_VERIFY(_dlist, _state)
fr_dlist_head_t inactive
Connections which have been signalled to be inactive by the API client.
Definition trunk.c:249
bool trigger_undef[NUM_ELEMENTS(trunk_conn_trigger_names)]
Record that a specific trigger is undefined.
Definition trunk.c:317
void trunk_connection_manage_stop(trunk_t *trunk)
Stop the trunk from opening and closing connections in response to load.
Definition trunk.c:4835
#define TREQ_HEAP_VERIFY(_heap, _state)
void trunk_connection_signal_active(trunk_connection_t *tconn)
Signal a trunk connection is no longer full.
Definition trunk.c:3976
fr_dlist_head_t log
State change log.
Definition trunk.c:123
uint64_t tconn_id
If the treq was associated with a connection the connection ID.
Definition trunk.c:85
fr_dlist_t entry
Used to track the connection in the connecting, full and failed lists.
Definition trunk.c:141
static void trunk_request_enter_cancel_sent(trunk_request_t *treq)
Transition a request to the cancel_sent state, placing it in a connection's cancel_sent list.
Definition trunk.c:1475
static void trunk_connection_enter_inactive(trunk_connection_t *tconn)
Transition a connection to the inactive state.
Definition trunk.c:3143
trunk_request_state_t from
What state we transitioned from.
Definition trunk.c:79
fr_pair_list_t * trigger_args
Passed to trigger.
Definition trunk.c:315
fr_dlist_head_t cancel_sent
Sent cancellation request.
Definition trunk.c:165
void trunk_connection_manage_start(trunk_t *trunk)
Allow the trunk to open and close connections in response to load.
Definition trunk.c:4824
fr_dlist_head_t inactive_draining
Connections which have been signalled to be inactive by the API client, which the trunk manager is dr...
Definition trunk.c:252
void trunk_connection_signal_inactive(trunk_connection_t *tconn)
Signal a trunk connection cannot accept more requests.
Definition trunk.c:3953
static int _state_log_entry_free(trunk_request_state_log_t *slog)
Used for sanity checks to ensure all log entries have been freed.
Definition trunk.c:2806
void trunk_connection_verify(char const *file, int line, trunk_connection_t *tconn)
Definition trunk.c:5132
fr_heap_t * backlog
The request backlog.
Definition trunk.c:229
#define IN_REQUEST_CANCEL_MUX(_trunk)
Definition trunk.c:723
void trunk_request_verify(char const *file, int line, trunk_request_t *treq)
Definition trunk.c:5192
uint64_t trunk_request_count_by_state(trunk_t *trunk, int conn_state, int req_state)
Return a count of requests on a connection in a specific state.
Definition trunk.c:4538
void trunk_request_signal_cancel_partial(trunk_request_t *treq)
Signal a partial cancel write.
Definition trunk.c:2252
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
Definition trunk.c:2066
#define COUNT_BY_STATE(_state, _list)
void * uctx
Uctx data to pass to alloc.
Definition trunk.c:280
#define TREQ_OPTION_VERIFY(_option, _state)
bool trunk_connection_search(trunk_connection_t *tconn, void *ptr)
Definition trunk.c:5248
#define CONN_BAD_STATE_TRANSITION(_new)
Definition trunk.c:466
fr_heap_index_t heap_id
Used to track the request conn->pending heap.
Definition trunk.c:106
#define REQUEST_STATE_TRANSITION(_new)
Record a request state transition and log appropriate output.
Definition trunk.c:489
trunk_watch_entry_t * next_watcher
Watcher about to be run. Used to prevent nested watchers.
Definition trunk.c:284
static uint64_t trunk_connection_requests_requeue_priv(trunk_connection_t *tconn, int states, uint64_t max, bool fail_bound)
Remove requests in specified states from a connection, attempting to distribute them to new connectio...
Definition trunk.c:1864
bool sent
Trunk request has been sent at least once.
Definition trunk.c:118
void trunk_request_signal_complete(trunk_request_t *treq)
Signal that a trunk request is complete.
Definition trunk.c:2110
static void trunk_connection_auto_unfull(trunk_connection_t *tconn)
Automatically mark a connection as active or reconnect it.
Definition trunk.c:2955
void trunk_connection_signal_reconnect(trunk_connection_t *tconn, connection_reason_t reason)
Signal a trunk connection is no longer viable.
Definition trunk.c:4015
void trunk_connection_signal_writable(trunk_connection_t *tconn)
Signal that a trunk connection is writable.
Definition trunk.c:3919
bool trunk_request_search(trunk_request_t *treq, void *ptr)
Definition trunk.c:5306
fr_dlist_t entry
List entry.
Definition trunk.c:186
static conf_parser_t const trunk_config_connection[]
Definition trunk.c:333
trunk_connection_state_t tconn_state
If the treq was associated with a connection the connection state at the time of the state transition...
Definition trunk.c:87
bool bound_to_conn
Fail the request if there's an attempt to re-enqueue it.
Definition trunk.c:115
static size_t trunk_cancellation_reasons_len
Definition trunk.c:433
static fr_table_num_ordered_t const trunk_cancellation_reasons[]
Definition trunk.c:427
static size_t trunk_conn_trigger_names_len
Definition trunk.c:210
fr_event_list_t * el
Event list used by this trunk and the connection.
Definition trunk.c:222
void trunk_request_state_log(fr_log_t const *log, fr_log_type_t log_type, char const *file, int line, trunk_request_t const *treq)
Definition trunk.c:2844
#define IN_REQUEST_MUX(_trunk)
Definition trunk.c:721
fr_dlist_head_t free_requests
Requests in the unassigned state.
Definition trunk.c:226
bool trunk_connection_in_state(trunk_connection_t *tconn, int state)
Returns true if the trunk connection is in one of the specified states.
Definition trunk.c:4063
#define TREQ_HEAP_SEARCH(_heap)
#define REQUEST_EXTRACT_CANCEL_SENT(_treq)
Remove the current request from the cancel sent list.
Definition trunk.c:784
fr_dlist_t entry
Entry in the linked list.
Definition trunk.c:78
void trunk_request_signal_reapable(trunk_request_t *treq)
Signal that the request was written to a connection successfully, but no response is expected.
Definition trunk.c:2088
Associates request queues with a connection.
Definition trunk.c:133
Wraps a normal request.
Definition trunk.c:99
Trace state machine changes for a particular request.
Definition trunk.c:76
Main trunk management handle.
Definition trunk.c:215
An entry in a trunk watch function list.
Definition trunk.c:185
uint16_t max
Maximum number of connections in the trunk.
Definition trunk.h:231
uint32_t max_req_per_conn
Maximum requests per connection.
Definition trunk.h:240
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
Definition trunk.h:320
trunk_t *_CONST trunk
Trunk this request belongs to.
Definition trunk.h:351
bool backlog_on_failed_conn
Assign requests to the backlog when there are no available connections and the last connection event ...
Definition trunk.h:281
uint16_t min
Shouldn't let connections drop below this number.
Definition trunk.h:229
#define TRUNK_REQUEST_STATE_ALL
All request states.
Definition trunk.h:195
void *_CONST rctx
Resume ctx of the module.
Definition trunk.h:357
trunk_t *_CONST trunk
Trunk this connection belongs to.
Definition trunk.h:379
fr_heap_cmp_t connection_prioritise
Ordering function for connections.
Definition trunk.h:741
trunk_connection_state_t
Used for sanity checks and to track which list the connection is in.
Definition trunk.h:87
@ TRUNK_CONN_FULL
Connection is full and can't accept any more requests.
Definition trunk.h:95
@ TRUNK_CONN_CONNECTING
Connection is connecting.
Definition trunk.h:90
@ TRUNK_CONN_DRAINING
Connection will be closed once it has no more outstanding requests, if it's not reactivated.
Definition trunk.h:101
@ TRUNK_CONN_INACTIVE_DRAINING
Connection is inactive, can't accept any more requests, and will be closed once it has no more outsta...
Definition trunk.h:97
@ TRUNK_CONN_INACTIVE
Connection is inactive and can't accept any more requests.
Definition trunk.h:96
@ TRUNK_CONN_HALTED
Halted, ready to be freed.
Definition trunk.h:88
@ TRUNK_CONN_CLOSED
Connection was closed, either explicitly or due to failure.
Definition trunk.h:94
@ TRUNK_CONN_INIT
In the initial state.
Definition trunk.h:89
@ TRUNK_CONN_DRAINING_TO_FREE
Connection will be closed once it has no more outstanding requests.
Definition trunk.h:103
@ TRUNK_CONN_ACTIVE
Connection is connected and ready to service requests.
Definition trunk.h:91
unsigned req_pool_headers
How many chunk headers the talloc pool allocated with the treq should contain.
Definition trunk.h:266
request_t *_CONST request
The request that we're writing the data on behalf of.
Definition trunk.h:359
fr_time_t _CONST last_open
Last time the connection management function opened a connection.
Definition trunk.h:310
fr_time_delta_t idle_timeout
how long a connection can remain idle for
Definition trunk.h:250
trunk_connection_state_t _CONST state
What state the connection is in.
Definition trunk.h:371
size_t req_pool_size
The size of the talloc pool allocated with the treq.
Definition trunk.h:269
uint64_t max_uses
The maximum time a connection can be used.
Definition trunk.h:246
fr_time_delta_t lifetime
Time between reconnects.
Definition trunk.h:248
uint16_t connecting
Maximum number of connections that can be in the connecting state.
Definition trunk.h:233
uint64_t _CONST req_alloc_reused
How many requests were reused.
Definition trunk.h:334
uint32_t max_backlog
Maximum number of requests that can be in the backlog.
Definition trunk.h:244
fr_time_t _CONST last_failed
Last time a connection failed.
Definition trunk.h:318
trunk_request_state_t _CONST state
Which list the request is now located in.
Definition trunk.h:349
fr_time_t _CONST last_write_success
Last time we wrote to the connection.
Definition trunk.h:375
trunk_connection_t *_CONST tconn
Connection this request belongs to.
Definition trunk.h:353
trunk_connection_alloc_t connection_alloc
Allocate a new connection_t.
Definition trunk.h:737
fr_time_t _CONST last_read_success
Last time we read a response.
Definition trunk.h:322
fr_time_t _CONST last_below_target
Last time average utilisation went below the target value.
Definition trunk.h:307
fr_time_t _CONST last_read_success
Last time we read from the connection.
Definition trunk.h:377
fr_time_delta_t close_delay
How long we must be below target utilisation to close an existing connection.
Definition trunk.h:255
uint16_t start
How many connections to start.
Definition trunk.h:227
fr_time_delta_t req_cleanup_delay
How long must a request in the unassigned (free) list not have been used for before it's cleaned up a...
Definition trunk.h:259
#define TRUNK_REQUEST_STATE_CANCEL_ALL
All requests in various cancellation states.
Definition trunk.h:213
bool always_writable
Set to true if our ability to write requests to a connection handle is not dependent on the state of ...
Definition trunk.h:271
trunk_connection_event_t
What type of I/O events the trunk connection is currently interested in receiving.
Definition trunk.h:72
@ TRUNK_CONN_EVENT_BOTH
Trunk should be notified if a connection is readable or writable.
Definition trunk.h:79
@ TRUNK_CONN_EVENT_WRITE
Trunk should be notified if a connection is writable.
Definition trunk.h:77
@ TRUNK_CONN_EVENT_NONE
Don't notify the trunk on connection state changes.
Definition trunk.h:73
@ TRUNK_CONN_EVENT_READ
Trunk should be notified if a connection is readable.
Definition trunk.h:75
#define TRUNK_CONN_ALL
All connection states.
Definition trunk.h:111
fr_heap_cmp_t request_prioritise
Ordering function for requests.
Definition trunk.h:743
uint64_t _CONST req_alloc
The number of requests currently allocated that have not been freed or returned to the free list.
Definition trunk.h:328
trunk_cancel_reason_t
Reasons for a request being cancelled.
Definition trunk.h:55
@ TRUNK_CANCEL_REASON_NONE
Request has not been cancelled.
Definition trunk.h:56
@ TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
Definition trunk.h:57
@ TRUNK_CANCEL_REASON_REQUEUE
A previously sent request is being requeued.
Definition trunk.h:59
@ TRUNK_CANCEL_REASON_MOVE
Request cancelled because it's being moved.
Definition trunk.h:58
uint64_t _CONST req_alloc_new
How many requests we've allocated.
Definition trunk.h:332
fr_time_delta_t open_delay
How long we must be above target utilisation to spawn a new connection.
Definition trunk.h:252
connection_t *_CONST conn
The underlying connection.
Definition trunk.h:373
trunk_state_t
Definition trunk.h:62
@ TRUNK_STATE_MAX
Definition trunk.h:66
@ TRUNK_STATE_PENDING
Trunk has connections, but none are active.
Definition trunk.h:65
@ TRUNK_STATE_ACTIVE
Trunk has active connections.
Definition trunk.h:64
@ TRUNK_STATE_IDLE
Trunk has no connections.
Definition trunk.h:63
fr_time_t _CONST last_closed
Last time the connection management function closed a connection.
Definition trunk.h:313
void(* trunk_watch_t)(trunk_t *trunk, trunk_state_t prev, trunk_state_t state, void *uctx)
Receive a notification when a trunk enters a particular state.
Definition trunk.h:728
fr_time_delta_t manage_interval
How often we run the management algorithm to open/close connections.
Definition trunk.h:263
trunk_enqueue_t
Definition trunk.h:148
@ TRUNK_ENQUEUE_DST_UNAVAILABLE
Destination is down.
Definition trunk.h:153
@ TRUNK_ENQUEUE_FAIL
General failure.
Definition trunk.h:154
@ TRUNK_ENQUEUE_OK
Operation was successful.
Definition trunk.h:150
@ TRUNK_ENQUEUE_NO_CAPACITY
At maximum number of connections, and no connection has capacity.
Definition trunk.h:151
@ TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
Definition trunk.h:149
void *_CONST preq
Data for the muxer to write to the connection.
Definition trunk.h:355
uint32_t target_req_per_conn
How many pending requests should ideally be running on each connection.
Definition trunk.h:236
fr_time_t _CONST last_connected
Last time a connection connected.
Definition trunk.h:316
trunk_request_cancel_mux_t request_cancel_mux
!< Read one or more requests from a connection.
Definition trunk.h:750
trunk_request_state_t
Used for sanity checks and to simplify freeing.
Definition trunk.h:161
@ TRUNK_REQUEST_STATE_PARTIAL
Some of the request was written to the socket, more of it should be written later.
Definition trunk.h:170
@ TRUNK_REQUEST_STATE_REAPABLE
Request has been written, needs to persist, but we are not currently waiting for any response.
Definition trunk.h:173
@ TRUNK_REQUEST_STATE_UNASSIGNED
Transition state - Request currently not assigned to any connection.
Definition trunk.h:165
@ TRUNK_REQUEST_STATE_INIT
Initial state.
Definition trunk.h:162
@ TRUNK_REQUEST_STATE_CANCEL_SENT
We've informed the remote server that the request has been cancelled.
Definition trunk.h:185
@ TRUNK_REQUEST_STATE_COMPLETE
The request is complete.
Definition trunk.h:182
@ TRUNK_REQUEST_STATE_FAILED
The request failed.
Definition trunk.h:183
@ TRUNK_REQUEST_STATE_CANCEL
A request on a particular socket was cancel.
Definition trunk.h:184
@ TRUNK_REQUEST_STATE_CANCEL_PARTIAL
We partially wrote a cancellation request.
Definition trunk.h:187
@ TRUNK_REQUEST_STATE_BACKLOG
In the backlog.
Definition trunk.h:167
@ TRUNK_REQUEST_STATE_CANCEL_COMPLETE
Remote server has acknowledged our cancellation.
Definition trunk.h:188
@ TRUNK_REQUEST_STATE_PENDING
In the queue of a connection and is pending writing.
Definition trunk.h:168
@ TRUNK_REQUEST_STATE_SENT
Was written to a socket. Waiting for a response.
Definition trunk.h:172
trunk_state_t _CONST state
Current state of the trunk.
Definition trunk.h:337
fr_time_t _CONST last_above_target
Last time average utilisation went above the target value.
Definition trunk.h:304
Common configuration parameters for a trunk.
Definition trunk.h:224
Public fields for the trunk connection.
Definition trunk.h:370
I/O functions to pass to trunk_alloc.
Definition trunk.h:736
Public fields for the trunk.
Definition trunk.h:300
Public fields for the trunk request.
Definition trunk.h:348
close(uq->fd)
static fr_event_list_t * el
static fr_slen_t head
Definition xlat.h:420
static fr_slen_t parent
Definition pair.h:841
char const * fr_strerror(void)
Get the last library error.
Definition strerror.c:553
#define fr_box_time_delta(_val)
Definition value.h:365
int nonnull(2, 5))
static size_t char ** out
Definition value.h:1023