The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
pipeline.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 2cf5735c8be7881c78af69da0b0ce96a85a1ef0d $
19 * @file lib/redis/pipeline.c
20 * @brief Functions for pipelining commands.
21 *
22 * @copyright 2019 The FreeRADIUS server project
23 * @copyright 2019 Network RADIUS SAS (legal@networkradius.com)
24 *
25 * @author Arran Cudbard-Bell (a.cudbardb@freeradius.org)
26 */
27
28#include <freeradius-devel/server/connection.h>
29#include <freeradius-devel/server/trunk.h>
30
31#include "pipeline.h"
32#include "io.h"
33
34
35/** Thread local state for a cluster
36 *
37 * MOVE ME TO NEW ASYNC CLUSTER CODE
38 */
41 trunk_conf_t const *tconf; //!< Configuration for all trunks in the cluster.
42 char *log_prefix; //!< Common log prefix to use for all cluster related
43 ///< messages.
44 bool delay_start; //!< Prevent connections from spawning immediately.
45};
46
47/** The thread local free list
48 *
49 * Any entries remaining in the list will be freed when the thread is joined
50 */
52
53typedef enum {
54 FR_REDIS_COMMAND_NORMAL = 0, //!< A normal, non-transactional command.
55 FR_REDIS_COMMAND_TRANSACTION_START, //!< Start of a transaction block. Either WATCH or MULTI.
56 ///< if a transaction is started with WATCH, then multi
57 ///< is not marked up as a transaction start.
58 FR_REDIS_COMMAND_TRANSACTION_END //!< End of a transaction block. Either EXEC or DISCARD.
59 ///< If this command fails with
60 ///< MOVED or ASK, all commands back to the previous
61 ///< MULTI command must be requeued.
63
64/** Represents a single command
65 *
66 */
68 fr_redis_command_set_t *cmds; //!< Command set this entry belongs to.
69 fr_dlist_t entry; //!< Entry in the command buffer.
70
71 fr_redis_command_type_t type; //!< Redis command type.
72
73 char const *str; //!< The command string.
74 size_t len; //!< Length of the command string.
75
76 uint64_t sqn; //!< The sequence number of the command. This is only
77 ///< valid for a specific handle, and is unique within
78 ///< the handle.
79
80 redisReply *result; //!< The result from the REDIS server.
81};
82
83/** Represents a collection of pipelined commands
84 *
85 * Commands MUST map to the same cluster node if using clustering.
86 */
89
90 /** @name Command state lists
91 * @{
92 */
93 fr_dlist_head_t pending; //!< Commands yet to be sent.
94 fr_dlist_head_t sent; //!< Commands sent.
95 fr_dlist_head_t completed; //!< Commands complete with replies.
96 /** @} */
97
98 uint8_t redirected; //!< How many times this command set was redirected.
99
100 /** @name Request state
101 *
102 * treq and request are duplicated here with the trunk code.
103 * The reason for this, is because a fr_command_set_t, may need to be transferred
104 * between trunks when redirects are being followed, and so we need this information
105 * encapsulated within the command set, not just within the trunk.
106 * @{
107 */
108 trunk_request_t *treq; //!< Trunk request this command set is associated with.
109 request_t *request; //!< Request this commands set is associated with (if any).
110 void *rctx; //!< Resume context to write results to.
111 /** @} */
112
113 /** @name Callback functions
114 * @{
115 */
116 fr_redis_command_set_complete_t complete; //!< Notify the creator of the command set
117 ///< that the command set has executed to
118 ///< to completion. We have results for
119 ///< all commands.
120
121 fr_redis_command_set_fail_t fail; //!< Notify the creator of the command set
122 ///< that the command set failed to execute
123 ///< to completion. Partial results will
124 ///< be available.
125 /** @} */
126
127 /** @name Command set transaction stats
128 *
129 * We do these checks as REDIS commands from a great number of requests may pipeline
130 * requests on the same connection and leaving a transaction open would be fairly
131 * catastrophic, potentially causing errors across all future command sets set to
132 * the connection.
133 * @{
134 */
135 bool txn_watch; //!< Transaction was started with a watch statement.
136 uint16_t txn_start; //!< Number of times a transaction block was started
137 ///< in this command set.
138 uint16_t txn_end; //!< The number of times a transaction block ended
139 ///< in this command set.
140
141 /** @} */
142};
143
145 fr_redis_io_conf_t const *io_conf; //!< Redis I/O configuration. Specifies how to connect
146 ///< to the host this trunk is used to communicate with.
147 trunk_t *trunk; //!< Trunk containing all the connections to a specific
148 ///< host.
149 fr_redis_cluster_thread_t *cluster; //!< Cluster this trunk belongs to.
150};
151
152/** Free any free requests when the thread is joined
153 *
154 */
156{
157 fr_dlist_head_t *list = talloc_get_type_abort(arg, fr_dlist_head_t);
159
160 /*
161 * See the destructor for why this works
162 */
163 while ((cmds = fr_dlist_head(list))) if (talloc_free(cmds) < 0) return -1;
164 return talloc_free(list);
165}
166
167/** Free a command set
168 *
169 */
171{
172 if (fr_dlist_num_elements(command_set_free_list) >= 1024) return 0; /* Keep a buffer of 1024 */
173
174 /*
175 * Freed from the free list....
176 */
178 fr_dlist_entry_unlink(&cmds->entry); /* Don't trust the list head to be available */
179 return 0;
180 }
181
182 talloc_free_children(cmds);
183 memset(&cmds, 0, sizeof(cmds));
184
186
187 return -1; /* Prevent the free */
188}
189
190/** Allocate a new command set
191 *
192 * This is a set of commands that the calling module wants to execute
193 * on the redis server in sequence.
194 *
195 * Control will be returned to the caller via the registered complete
196 * and fail functions.
197 *
198 * @param[in] ctx to bind the command set's lifetime to.
199 * @param[in] request to pass to places that need it.
200 * @param[in] complete Function to call when all commands have been processed.
201 * @param[in] fail Function to call if the command set was not executed
202 * or was partially executed.
203 * @param[in] rctx Resume context to pass to complete and fail functions.
204 * @return A new or refurbished command set.
205 */
207 request_t *request,
210 void *rctx)
211
212{
214 fr_dlist_head_t *free_list;
215
216#define COMMAND_PRE_ALLOC_COUNT 8 //!< How much room we pre-allocate for commands.
217#define COMMAND_PRE_ALLOC_LEN 64 //!< How much we allocate for each command string.
218
219 /*
220 * Initialise the free list
221 */
223 MEM(free_list = talloc(NULL, fr_dlist_head_t));
224 fr_dlist_init(free_list, fr_redis_command_set_t, entry);
226 } else {
227 free_list = command_set_free_list;
228 }
229
230 /*
231 * Pull an element out of the free list
232 * or allocate a new one.
233 */
234 cmds = fr_dlist_head(free_list);
235 if (!cmds) {
240 talloc_set_destructor(cmds, _redis_command_set_free);
242 } else {
243 fr_dlist_remove(free_list, cmds);
244 }
245
249 cmds->request = request;
250 cmds->complete = complete;
251 cmds->fail = fail;
252 cmds->rctx = rctx;
253
254 if (ctx) talloc_link_ctx(ctx, cmds);
255
256 return cmds;
257}
258
259/** Free any result associated with the command
260 *
261 * @param[in] cmd to free. Frees any redis results associated with the command.
262 */
264{
265 //if (cmd->result) fr_redis_reply_free(&cmd->result);
266
267 return 0;
268}
269
271{
272 return cmd->result;
273}
274
275/** Add a preformatted/expanded command to the command set
276 *
277 * The command must either be entirely static, or parented by the command set.
278 *
279 * @note Caller should disallow "SUBSCRIBE" et al, if they're not appropriate.
280 * As subscribing to a stream where we're not expecting it would break
281 * things, badly.
282 *
283 * @param[in] cmds Command set to add command to.
284 * @param[in] cmd_str A fully expanded/formatted command to send to redis.
285 * Must be static, or have the same lifetime as the
286 * command set (allocated with the command set as the parent).
287 * @param[in] cmd_len Length of the command.
288 * @return
289 * - FR_REDIS_PIPELINE_BAD_CMDS if a bad command sequence is enqueued.
290 * - FR_REDIS_PIPELINE_OK if command was enqueued successfully.
291 */
293 char const *cmd_str, size_t cmd_len)
294{
295 request_t *request = cmds->request;
298
299 /*
300 * Transaction sanity checks.
301 *
302 * Because commands from many different requests share the same connection
303 * we need to ensure that transaction blocks aren't left dangling and
304 * that the commands are all in the right order.
305 *
306 * We try very hard to do this without incurring a performance penalty
307 * for non-transactional commands.
308 */
309 switch (tolower(cmd_str[0])) {
310 case 'm':
311 if (tolower(cmd_str[1] != 'u')) break;
312 if (strncasecmp(cmd_str, "multi", sizeof("multi") - 1) != 0) break;
313 /*
314 * There should only ever be a difference of
315 * 1 between txn starts and txn ends.
316 */
317 if ((cmds->txn_end < cmds->txn_start) && ((cmds->txn_start - cmds->txn_end) > 1)) {
318 ROPTIONAL(ERROR, REDEBUG, "Too many consecutive \"MULTI\" commands");
320 }
321 /*
322 * If we have a watch before the MULTI,
323 * that's marked as the start of the transaction
324 * block.
325 */
327 cmds->txn_start++; /* Yes MULTI increments start, not WATCH */
328 break;
329
330 case 'e':
331 if (tolower(cmd_str[1] != 'e')) break;
332 if (strncasecmp(cmd_str, "exec", sizeof("exec") - 1) != 0) break;
333 goto txn_end;
334
335 /*
336 * It's useful to allow discard as it allows command syntax checks
337 * to be performed against the REDIS server without actually
338 * executing the commands.
339 */
340 case 'd':
341 if (tolower(cmd_str[1] != 'i')) break;
342 if (strncasecmp(cmd_str, "discard", sizeof("discard") - 1) != 0) break;
343 txn_end:
344 if (cmds->txn_start <= cmds->txn_end) {
345 ROPTIONAL(ERROR, REDEBUG, "Transaction not started, missing \"MULTI\" command");
347 }
349 cmds->txn_end++;
350 break;
351
352 case 'w':
353 if (tolower(cmd_str[1] != 'a')) break;
354 if (strncasecmp(cmd_str, "watch", sizeof("watch") - 1) != 0) break;
355 if (cmds->txn_watch) {
356 ROPTIONAL(ERROR, REDEBUG, "Too many consecutive \"WATCH\" commands");
358 }
359 if (cmds->txn_start > cmds->txn_end) {
360 ROPTIONAL(ERROR, REDEBUG, "\"WATCH\" can only be used before \"MULTI\"");
362 }
364
365 default:
366 break;
367 }
368
369 MEM(cmd = talloc_zero(cmds, fr_redis_command_t));
370 talloc_set_destructor(cmd, _redis_command_free);
371 cmd->cmds = cmds;
372 cmd->type = type;
373 cmd->str = cmd_str;
374 cmd->len = cmd_len;
375 fr_dlist_insert_tail(&cmds->pending, cmd);
376
378}
379
380/** Enqueue a command set on a specific trunk
381 *
382 * The command set may be passed around several trunks before it is complete.
383 * This is to allow it to follow MOVED and ASK responses.
384 *
385 * @param[in] rtrunk to enqueue command set on.
386 * @param[in] cmds Command set to enqueue.
387 * @return
388 * - FR_REDIS_PIPELINE_OK if commands were immediately enqueued or placed in the backlog.
389 * - FR_REDIS_PIPELINE_DST_UNAVAILABLE if the REDIS host is unreachable.
390 * - FR_REDIS_PIPELINE_FAIL any other general error.
391 */
393{
394 if (cmds->txn_start != cmds->txn_end) {
395 ERROR("Refusing to enqueue - Unbalanced transaction start/stop commands");
397 }
398
399 switch (trunk_request_enqueue(&cmds->treq, rtrunk->trunk, cmds->request, cmds, cmds->rctx)) {
400 case TRUNK_ENQUEUE_OK:
403
406
407 default:
409 }
410}
411
412/** Callback for for receiving Redis replies
413 *
414 * This is called by hiredis for each response is receives. privData is set to the
415 * fr_command_set
416 *
417 * @note Called only from hiredis, not the trunk itself.
418 *
419 * @param[in] ac The async context the command was enqueued on.
420 * @param[in] vreply redisReply containing the result of the command.
421 * @param[in] privdata fr_redis_command_t that was sent to the Redis server.
422 * The fr_redis_command_t contains a pointer to the
423 * fr_redis_command_set_t which holds the treq which
424 * we use to signal that we have responses for all
425 * commands.
426 */
427static void _redis_pipeline_demux(struct redisAsyncContext *ac, void *vreply, void *privdata)
428{
431 connection_t *conn = talloc_get_type_abort(ac->ev.data, connection_t);
432 fr_redis_handle_t *h = talloc_get_type_abort(conn->h, fr_redis_handle_t);
433 redisReply *reply = vreply;
434 /*
435 * First check if we should ignore the response
436 */
438 DEBUG4("Ignoring response with SQN %"PRIu64, (h->rsp_sqn - 1)); /* Already incremented */
439 fr_redis_reply_free((redisReply **)&reply);
440 return;
441 }
442
443 /*
444 * FIXME - Need to check TRYAGAIN, MOVED etc...
445 * I guess we might want to wait for the end of
446 * the command set to do that.
447 */
448 cmd = talloc_get_type_abort(privdata, fr_redis_command_t);
449 cmds = cmd->cmds;
450 cmd->result = reply;
451
452 fr_dlist_remove(&cmds->sent, cmd);
453 fr_dlist_insert_tail(&cmds->completed, cmd);
454
455 /*
456 * Check is the command set is complete,
457 * and if it is, tell the trunk the treq
458 * is complete.
459 */
460 if ((fr_dlist_num_elements(&cmds->pending) == 0) &&
462}
463
465 connection_conf_t const *conf,
466 char const *log_prefix, void *uctx)
467{
468 fr_redis_trunk_t *rtrunk = talloc_get_type_abort(uctx, fr_redis_trunk_t);
469
470 return fr_redis_connection_alloc(tconn, el, conf, rtrunk->io_conf, log_prefix);
471}
472
473/** Enqueue one or more command sets onto a redis handle
474 *
475 * Because the trunk is in always writable mode, _redis_pipeline_mux
476 * will be called any time trunk_request_enqueue is called, so there'll only
477 * ever be one command to dequeue.
478 *
479 * @param[in] tconn Trunk connection holding the commands to enqueue.
480 * @param[in] conn Connection handle containing the fr_redis_handle_t.
481 * @param[in] uctx fr_redis_cluster_t. Unused.
482 */
483static void _redis_pipeline_mux(trunk_connection_t *tconn, connection_t *conn, UNUSED void *uctx)
484{
485 trunk_request_t *treq;
488 fr_redis_handle_t *h = talloc_get_type_abort(conn->h, fr_redis_handle_t);
489 request_t *request;
490
491 treq = trunk_connection_pop_request(&request, (void *)&cmds, NULL, tconn);
492 while ((cmd = fr_dlist_head(&cmds->pending))) {
493 /*
494 * If this fails it probably means the connection
495 * is disconnecting, but if that's happening then
496 * we shouldn't be enqueueing new requests?
497 */
498 if (unlikely(redisAsyncCommand(h->ac, _redis_pipeline_demux, cmd, "%s", cmd->str) != REDIS_OK)) {
499 ROPTIONAL(ERROR, REDEBUG, "Unexpected error queueing REDIS command");
500
501 while ((cmd = fr_dlist_head(&cmds->sent))) {
503 fr_dlist_remove(&cmds->sent, cmd);
504 fr_dlist_insert_tail(&cmds->pending, cmd);
505 }
507 return;
508 }
510 fr_dlist_remove(&cmds->pending, cmd);
511 fr_dlist_insert_tail(&cmds->sent, cmd);
512 }
514}
515
516/** Deal with cancellation of sent requests
517 *
518 * We can't actually signal redis to not process the request, so depending
519 * on why the commands were cancelled, we either tell the handle to ignore
520 * them, or move them back into the pending list.
521 */
523 trunk_cancel_reason_t reason, UNUSED void *uctx)
524{
525 fr_redis_command_set_t *cmds = talloc_get_type_abort(preq, fr_redis_command_set_t);
526 fr_redis_handle_t *h = conn->h;
527
528 /*
529 * How we cancel is very different depending
530 * on _WHY_ we're cancelling.
531 */
532 switch (reason) {
533 /*
534 * Cancel is only called for requests that
535 * have been sent, and only when the connection
536 * is about to be closed for some reason.
537 *
538 * We don't need to tell the handle to ignore
539 * the responses, we just need to get the
540 * command set back into the correct state for
541 * execution by another handle.
542 */
544 fr_dlist_move(&cmds->pending, &cmds->sent);
545 return;
546
547 /*
548 * If the request was cancelled due to a signal
549 * we'll have a response coming back for a
550 * request, pctx and rctx that no longer exist.
551 * Tell the handle to signal that the response
552 * should be ignored when it's received.
553 *
554 * Free will take care of cleaning up the
555 * pending commands.
556 */
558 {
560
561 for (cmd = fr_dlist_head(&cmds->sent);
562 cmd;
563 cmd = fr_dlist_next(&cmds->sent, cmd)) {
565 }
566 }
567
569 fr_assert(0);
570 return;
571 }
572}
573
574/** Signal the API client that we got a complete set of responses to a command set
575 *
576 */
578 UNUSED void *rctx, UNUSED void *uctx)
579{
580 fr_redis_command_set_t *cmds = talloc_get_type_abort(preq, fr_redis_command_set_t);
581
582 if (cmds->complete) cmds->complete(cmds->request, &cmds->completed, cmds->rctx);
583}
584
585/** Signal the API client that we failed enqueuing the commands
586 *
587 */
588static void _redis_pipeline_command_set_fail(UNUSED request_t *request, void *preq,
589 UNUSED void *rctx, UNUSED void *uctx)
590{
591 fr_redis_command_set_t *cmds = talloc_get_type_abort(preq, fr_redis_command_set_t);
592
593 if (cmds->fail) cmds->fail(cmds->request, &cmds->completed, cmds->rctx);
594}
595
596/** Free the command set
597 *
598 */
599static void _redis_pipeline_command_set_free(UNUSED request_t *request, void *preq,
600 UNUSED void *uctx)
601{
602 fr_redis_command_set_t *cmds = talloc_get_type_abort(preq, fr_redis_command_set_t);
603
604 talloc_free(cmds);
605}
606
607/** Allocate a new trunk
608 *
609 * @param[in] cluster_thread to allocate the trunk for.
610 * @param[in] io_conf Describing the connection to a single REDIS host.
611 * @return
612 * - On success, a new fr_redis_trunk_t which can be used for pipelining commands.
613 * - NULL on failure.
614 */
616{
617 fr_redis_trunk_t *rtrunk;
620 .request_mux = _redis_pipeline_mux,
621 /* demux called directly by hiredis */
622 .request_cancel = _redis_pipeline_command_set_cancel,
623 .request_complete = _redis_pipeline_command_set_complete,
624 .request_fail = _redis_pipeline_command_set_fail,
626 };
627
628 MEM(rtrunk = talloc_zero(cluster_thread, fr_redis_trunk_t));
629 rtrunk->io_conf = io_conf;
630 rtrunk->trunk = trunk_alloc(rtrunk, cluster_thread->el,
631 &io_funcs, cluster_thread->tconf, cluster_thread->log_prefix, rtrunk,
632 cluster_thread->delay_start);
633 if (!rtrunk->trunk) {
634 talloc_free(rtrunk);
635 return NULL;
636 }
637
638 return rtrunk;
639}
640
641/** Allocate per-thread, per-cluster instance
642 *
643 * This structure represents all the connections for a given thread for a given cluster.
644 * The structures holds the trunk connections to talk to each cluster member.
645 *
646 */
648{
649 fr_redis_cluster_thread_t *cluster_thread;
650 trunk_conf_t *our_tconf;
651
652 MEM(cluster_thread = talloc_zero(ctx, fr_redis_cluster_thread_t));
653 MEM(our_tconf = talloc_memdup(cluster_thread, tconf, sizeof(*tconf)));
654 our_tconf->always_writable = true;
655
656 cluster_thread->el = el;
657 cluster_thread->tconf = our_tconf;
658
659 return cluster_thread;
660}
#define fr_atexit_thread_local(_name, _free, _uctx)
Definition atexit.h:221
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition build.h:322
#define unlikely(_x)
Definition build.h:381
#define UNUSED
Definition build.h:315
#define MEM(x)
Definition debug.h:36
#define ERROR(fmt,...)
Definition dhcpclient.c:41
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:260
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition dlist.h:486
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition dlist.h:638
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition dlist.h:163
static void fr_dlist_entry_unlink(fr_dlist_t *entry)
Remove an item from the dlist when we don't have access to the head.
Definition dlist.h:146
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:939
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:378
static int fr_dlist_move(fr_dlist_head_t *list_dst, fr_dlist_head_t *list_src)
Merge two lists, inserting the source at the tail of the destination.
Definition dlist.h:763
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:275
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
Definition dlist.h:338
static void fr_dlist_entry_init(fr_dlist_t *entry)
Initialise a linked list without metadata.
Definition dlist.h:138
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Definition dlist.h:555
Head of a doubly linked list.
Definition dlist.h:51
Entry in a doubly linked list.
Definition dlist.h:41
connection_t * fr_redis_connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_conf_t const *conn_conf, fr_redis_io_conf_t const *io_conf, char const *log_prefix)
Allocate an async redis I/O connection.
Definition io.c:422
fr_redis_sqn_t rsp_sqn
Current redis response number.
Definition io.h:85
redisAsyncContext * ac
Async handle for hiredis.
Definition io.h:74
static void fr_redis_connection_ignore_response(fr_redis_handle_t *h, fr_redis_sqn_t sqn)
Ignore a response with a specific sequence number.
Definition io.h:106
static bool fr_redis_connection_process_response(fr_redis_handle_t *h)
Update the response sequence number and check if we should ignore the response.
Definition io.h:124
static fr_redis_sqn_t fr_redis_connection_sent_request(fr_redis_handle_t *h)
Tell the handle we sent a command, and get the SQN that command was assigned.
Definition io.h:96
Store I/O state.
Definition io.h:66
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition log.h:528
#define DEBUG4(_fmt,...)
Definition log.h:267
talloc_free(reap)
Stores all information relating to an event list.
Definition event.c:411
unsigned short uint16_t
unsigned char uint8_t
int strncasecmp(char *s1, char *s2, int n)
Definition missing.c:36
static const trunk_io_funcs_t io_funcs
Definition bio.c:2416
Function prototypes and datatypes for the REST (HTTP) transport.
request_t * request
Request this commands set is associated with (if any).
Definition pipeline.c:109
static int _command_set_free_list_free_on_exit(void *arg)
Free any free requests when the thread is joined.
Definition pipeline.c:155
fr_redis_cluster_thread_t * cluster
Cluster this trunk belongs to.
Definition pipeline.c:149
fr_dlist_head_t sent
Commands sent.
Definition pipeline.c:94
uint16_t txn_start
Number of times a transaction block was started in this command set.
Definition pipeline.c:136
fr_redis_cluster_thread_t * fr_redis_cluster_thread_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, trunk_conf_t const *tconf)
Allocate per-thread, per-cluster instance.
Definition pipeline.c:647
fr_redis_io_conf_t const * io_conf
Redis I/O configuration.
Definition pipeline.c:145
trunk_conf_t const * tconf
Configuration for all trunks in the cluster.
Definition pipeline.c:41
trunk_t * trunk
Trunk containing all the connections to a specific.
Definition pipeline.c:147
fr_dlist_t entry
Entry in the command buffer.
Definition pipeline.c:69
static connection_t * _redis_pipeline_connection_alloc(trunk_connection_t *tconn, fr_event_list_t *el, connection_conf_t const *conf, char const *log_prefix, void *uctx)
Definition pipeline.c:464
fr_redis_command_set_t * fr_redis_command_set_alloc(TALLOC_CTX *ctx, request_t *request, fr_redis_command_set_complete_t complete, fr_redis_command_set_fail_t fail, void *rctx)
Allocate a new command set.
Definition pipeline.c:206
#define COMMAND_PRE_ALLOC_COUNT
void * rctx
Resume context to write results to.
Definition pipeline.c:110
uint64_t sqn
The sequence number of the command.
Definition pipeline.c:76
fr_redis_command_set_complete_t complete
Notify the creator of the command set that the command set has executed to to completion.
Definition pipeline.c:116
fr_redis_command_set_fail_t fail
Notify the creator of the command set that the command set failed to execute to completion.
Definition pipeline.c:121
char * log_prefix
Common log prefix to use for all cluster related messages.
Definition pipeline.c:42
uint16_t txn_end
The number of times a transaction block ended in this command set.
Definition pipeline.c:138
fr_redis_pipeline_status_t redis_command_set_enqueue(fr_redis_trunk_t *rtrunk, fr_redis_command_set_t *cmds)
Enqueue a command set on a specific trunk.
Definition pipeline.c:392
static int _redis_command_free(fr_redis_command_t *cmd)
Free any result associated with the command.
Definition pipeline.c:263
static void _redis_pipeline_demux(struct redisAsyncContext *ac, void *vreply, void *privdata)
Callback for for receiving Redis replies.
Definition pipeline.c:427
fr_event_list_t * el
Definition pipeline.c:40
#define COMMAND_PRE_ALLOC_LEN
char const * str
The command string.
Definition pipeline.c:73
bool delay_start
Prevent connections from spawning immediately.
Definition pipeline.c:44
static void _redis_pipeline_command_set_free(UNUSED request_t *request, void *preq, UNUSED void *uctx)
Free the command set.
Definition pipeline.c:599
static _Thread_local fr_dlist_head_t * command_set_free_list
The thread local free list.
Definition pipeline.c:51
redisReply * result
The result from the REDIS server.
Definition pipeline.c:80
static void _redis_pipeline_command_set_cancel(connection_t *conn, UNUSED trunk_request_t *treq, void *preq, trunk_cancel_reason_t reason, UNUSED void *uctx)
Deal with cancellation of sent requests.
Definition pipeline.c:522
fr_redis_command_type_t type
Redis command type.
Definition pipeline.c:71
static int _redis_command_set_free(fr_redis_command_set_t *cmds)
Free a command set.
Definition pipeline.c:170
static void _redis_pipeline_command_set_complete(UNUSED request_t *request, void *preq, UNUSED void *rctx, UNUSED void *uctx)
Signal the API client that we got a complete set of responses to a command set.
Definition pipeline.c:577
static void _redis_pipeline_command_set_fail(UNUSED request_t *request, void *preq, UNUSED void *rctx, UNUSED void *uctx)
Signal the API client that we failed enqueuing the commands.
Definition pipeline.c:588
bool txn_watch
Transaction was started with a watch statement.
Definition pipeline.c:135
fr_redis_pipeline_status_t fr_redis_command_preformatted_add(fr_redis_command_set_t *cmds, char const *cmd_str, size_t cmd_len)
Add a preformatted/expanded command to the command set.
Definition pipeline.c:292
fr_redis_trunk_t * fr_redis_trunk_alloc(fr_redis_cluster_thread_t *cluster_thread, fr_redis_io_conf_t const *io_conf)
Allocate a new trunk.
Definition pipeline.c:615
fr_dlist_head_t completed
Commands complete with replies.
Definition pipeline.c:95
fr_redis_command_type_t
Definition pipeline.c:53
@ FR_REDIS_COMMAND_TRANSACTION_START
Start of a transaction block.
Definition pipeline.c:55
@ FR_REDIS_COMMAND_NORMAL
A normal, non-transactional command.
Definition pipeline.c:54
@ FR_REDIS_COMMAND_TRANSACTION_END
End of a transaction block.
Definition pipeline.c:58
uint8_t redirected
How many times this command set was redirected.
Definition pipeline.c:98
size_t len
Length of the command string.
Definition pipeline.c:74
trunk_request_t * treq
Trunk request this command set is associated with.
Definition pipeline.c:108
fr_dlist_head_t pending
Commands yet to be sent.
Definition pipeline.c:93
fr_redis_command_set_t * cmds
Command set this entry belongs to.
Definition pipeline.c:68
static void _redis_pipeline_mux(trunk_connection_t *tconn, connection_t *conn, UNUSED void *uctx)
Enqueue one or more command sets onto a redis handle.
Definition pipeline.c:483
redisReply * fr_redis_command_get_result(fr_redis_command_t *cmd)
Definition pipeline.c:270
Thread local state for a cluster.
Definition pipeline.c:39
Represents a single command.
Definition pipeline.c:67
Represents a collection of pipelined commands.
Definition pipeline.c:87
Redis asynchronous command pipelining.
void(* fr_redis_command_set_complete_t)(request_t *request, fr_dlist_head_t *completed, void *rctx)
Do something meaningful with the replies to the commands previously issued.
Definition pipeline.h:58
void(* fr_redis_command_set_fail_t)(request_t *request, fr_dlist_head_t *completed, void *rctx)
Write a failure result to the rctx so that the module is aware that the request failed.
Definition pipeline.h:64
fr_redis_pipeline_status_t
Definition pipeline.h:41
@ FR_REDIS_PIPELINE_OK
No failure.
Definition pipeline.h:42
@ FR_REDIS_PIPELINE_BAD_CMDS
Malformed command set.
Definition pipeline.h:43
@ FR_REDIS_PIPELINE_DST_UNAVAILABLE
Cluster or host is down.
Definition pipeline.h:44
@ FR_REDIS_PIPELINE_FAIL
Generic failure.
Definition pipeline.h:46
#define fr_assert(_expr)
Definition rad_assert.h:38
#define REDEBUG(fmt,...)
Definition radclient.h:52
static rs_t * conf
Definition radsniff.c:53
static void fr_redis_reply_free(redisReply **reply)
Wrap freeReplyObject so we consistently check for NULL pointers.
Definition base.h:64
fr_aka_sim_id_type_t type
int talloc_link_ctx(TALLOC_CTX *parent, TALLOC_CTX *child)
Link two different parent and child contexts, so the child is freed before the parent.
Definition talloc.c:171
#define talloc_zero_pooled_object(_ctx, _type, _num_subobjects, _total_subobjects_size)
Definition talloc.h:177
void trunk_request_signal_fail(trunk_request_t *treq)
Signal that a trunk request failed.
Definition trunk.c:2132
trunk_t * trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, trunk_io_funcs_t const *funcs, trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start)
Allocate a new collection of connections.
Definition trunk.c:4945
trunk_enqueue_t trunk_request_enqueue(trunk_request_t **treq_out, trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
Definition trunk.c:2587
int trunk_connection_pop_request(trunk_request_t **treq_out, trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
Definition trunk.c:3883
void trunk_request_signal_sent(trunk_request_t *treq)
Signal that the request was written to a connection successfully.
Definition trunk.c:2050
void trunk_request_signal_complete(trunk_request_t *treq)
Signal that a trunk request is complete.
Definition trunk.c:2094
Associates request queues with a connection.
Definition trunk.c:134
Wraps a normal request.
Definition trunk.c:100
Main trunk management handle.
Definition trunk.c:198
trunk_connection_alloc_t connection_alloc
Allocate a new connection_t.
Definition trunk.h:733
bool always_writable
Set to true if our ability to write requests to a connection handle is not dependent on the state of ...
Definition trunk.h:271
trunk_cancel_reason_t
Reasons for a request being cancelled.
Definition trunk.h:55
@ TRUNK_CANCEL_REASON_NONE
Request has not been cancelled.
Definition trunk.h:56
@ TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
Definition trunk.h:57
@ TRUNK_CANCEL_REASON_MOVE
Request cancelled because it's being moved.
Definition trunk.h:58
@ TRUNK_ENQUEUE_DST_UNAVAILABLE
Destination is down.
Definition trunk.h:153
@ TRUNK_ENQUEUE_OK
Operation was successful.
Definition trunk.h:150
@ TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
Definition trunk.h:149
Common configuration parameters for a trunk.
Definition trunk.h:224
I/O functions to pass to trunk_alloc.
Definition trunk.h:732
static fr_event_list_t * el