The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
pipeline.c
Go to the documentation of this file.
1 /*
2  * This program is is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or (at
5  * your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: 90735c069802c045065e787b35548b0cfa116e49 $
19  * @file lib/redis/pipeline.c
20  * @brief Functions for pipelining commands.
21  *
22  * @copyright 2019 The FreeRADIUS server project
23  * @copyright 2019 Network RADIUS SAS (legal@networkradius.com)
24  *
25  * @author Arran Cudbard-Bell (a.cudbardb@freeradius.org)
26  */
27 
28 #include <freeradius-devel/server/connection.h>
29 #include <freeradius-devel/server/trunk.h>
30 
31 #include "pipeline.h"
32 #include "io.h"
33 
34 
35 /** Thread local state for a cluster
36  *
37  * MOVE ME TO NEW ASYNC CLUSTER CODE
38  */
41  fr_trunk_conf_t const *tconf; //!< Configuration for all trunks in the cluster.
42  char *log_prefix; //!< Common log prefix to use for all cluster related
43  ///< messages.
44  bool delay_start; //!< Prevent connections from spawning immediately.
45 };
46 
47 /** The thread local free list
48  *
49  * Any entries remaining in the list will be freed when the thread is joined
50  */
51 static _Thread_local fr_dlist_head_t *command_set_free_list;
52 
53 typedef enum {
54  FR_REDIS_COMMAND_NORMAL = 0, //!< A normal, non-transactional command.
55  FR_REDIS_COMMAND_TRANSACTION_START, //!< Start of a transaction block. Either WATCH or MULTI.
56  ///< if a transaction is started with WATCH, then multi
57  ///< is not marked up as a transaction start.
58  FR_REDIS_COMMAND_TRANSACTION_END //!< End of a transaction block. Either EXEC or DISCARD.
59  ///< If this command fails with
60  ///< MOVED or ASK, all commands back to the previous
61  ///< MULTI command must be requeued.
63 
64 /** Represents a single command
65  *
66  */
68  fr_redis_command_set_t *cmds; //!< Command set this entry belongs to.
69  fr_dlist_t entry; //!< Entry in the command buffer.
70 
71  fr_redis_command_type_t type; //!< Redis command type.
72 
73  char const *str; //!< The command string.
74  size_t len; //!< Length of the command string.
75 
76  uint64_t sqn; //!< The sequence number of the command. This is only
77  ///< valid for a specific handle, and is unique within
78  ///< the handle.
79 
80  redisReply *result; //!< The result from the REDIS server.
81 };
82 
83 /** Represents a collection of pipelined commands
84  *
85  * Commands MUST map to the same cluster node if using clustering.
86  */
89 
90  /** @name Command state lists
91  * @{
92  */
93  fr_dlist_head_t pending; //!< Commands yet to be sent.
94  fr_dlist_head_t sent; //!< Commands sent.
95  fr_dlist_head_t completed; //!< Commands complete with replies.
96  /** @} */
97 
98  uint8_t redirected; //!< How many times this command set was redirected.
99 
100  /** @name Request state
101  *
102  * treq and request are duplicated here with the trunk code.
103  * The reason for this, is because a fr_command_set_t, may need to be transferred
104  * between trunks when redirects are being followed, and so we need this information
105  * encapsulated within the command set, not just within the trunk.
106  * @{
107  */
108  fr_trunk_request_t *treq; //!< Trunk request this command set is associated with.
109  request_t *request; //!< Request this commands set is associated with (if any).
110  void *rctx; //!< Resume context to write results to.
111  /** @} */
112 
113  /** @name Callback functions
114  * @{
115  */
116  fr_redis_command_set_complete_t complete; //!< Notify the creator of the command set
117  ///< that the command set has executed to
118  ///< to completion. We have results for
119  ///< all commands.
120 
121  fr_redis_command_set_fail_t fail; //!< Notify the creator of the command set
122  ///< that the command set failed to execute
123  ///< to completion. Partial results will
124  ///< be available.
125  /** @} */
126 
127  /** @name Command set transaction stats
128  *
129  * We do these checks as REDIS commands from a great number of requests may pipeline
130  * requests on the same connection and leaving a transaction open would be fairly
131  * catastrophic, potentially causing errors across all future command sets set to
132  * the connection.
133  * @{
134  */
135  bool txn_watch; //!< Transaction was started with a watch statement.
136  uint16_t txn_start; //!< Number of times a transaction block was started
137  ///< in this command set.
138  uint16_t txn_end; //!< The number of times a transaction block ended
139  ///< in this command set.
140 
141  /** @} */
142 };
143 
145  fr_redis_io_conf_t const *io_conf; //!< Redis I/O configuration. Specifies how to connect
146  ///< to the host this trunk is used to communicate with.
147  fr_trunk_t *trunk; //!< Trunk containing all the connections to a specific
148  ///< host.
149  fr_redis_cluster_thread_t *cluster; //!< Cluster this trunk belongs to.
150 };
151 
152 /** Free any free requests when the thread is joined
153  *
154  */
156 {
157  fr_dlist_head_t *list = talloc_get_type_abort(arg, fr_dlist_head_t);
159 
160  /*
161  * See the destructor for why this works
162  */
163  while ((cmds = fr_dlist_head(list))) if (talloc_free(cmds) < 0) return -1;
164  return talloc_free(list);
165 }
166 
167 /** Free a command set
168  *
169  */
171 {
172  if (fr_dlist_num_elements(command_set_free_list) >= 1024) return 0; /* Keep a buffer of 1024 */
173 
174  /*
175  * Freed from the free list....
176  */
177  if (unlikely(fr_dlist_entry_in_list(&cmds->entry))) {
178  fr_dlist_entry_unlink(&cmds->entry); /* Don't trust the list head to be available */
179  return 0;
180  }
181 
182  talloc_free_children(cmds);
183  memset(&cmds, 0, sizeof(cmds));
184 
186 
187  return -1; /* Prevent the free */
188 }
189 
190 /** Allocate a new command set
191  *
192  * This is a set of commands that the calling module wants to execute
193  * on the redis server in sequence.
194  *
195  * Control will be returned to the caller via the registered complete
196  * and fail functions.
197  *
198  * @param[in] ctx to bind the command set's lifetime to.
199  * @param[in] request to pass to places that need it.
200  * @param[in] complete Function to call when all commands have been processed.
201  * @param[in] fail Function to call if the command set was not executed
202  * or was partially executed.
203  * @param[in] rctx Resume context to pass to complete and fail functions.
204  * @return A new or refurbished command set.
205  */
207  request_t *request,
210  void *rctx)
211 
212 {
214  fr_dlist_head_t *free_list;
215 
216 #define COMMAND_PRE_ALLOC_COUNT 8 //!< How much room we pre-allocate for commands.
217 #define COMMAND_PRE_ALLOC_LEN 64 //!< How much we allocate for each command string.
218 
219  /*
220  * Initialise the free list
221  */
223  MEM(free_list = talloc(NULL, fr_dlist_head_t));
224  fr_dlist_init(free_list, fr_redis_command_set_t, entry);
226  } else {
227  free_list = command_set_free_list;
228  }
229 
230  /*
231  * Pull an element out of the free list
232  * or allocate a new one.
233  */
234  cmds = fr_dlist_head(free_list);
235  if (!cmds) {
240  talloc_set_destructor(cmds, _redis_command_set_free);
241  fr_dlist_entry_init(&cmds->entry);
242  } else {
243  fr_dlist_remove(free_list, cmds);
244  }
245 
249  cmds->request = request;
250  cmds->complete = complete;
251  cmds->fail = fail;
252  cmds->rctx = rctx;
253 
254  if (ctx) talloc_link_ctx(ctx, cmds);
255 
256  return cmds;
257 }
258 
259 /** Free any result associated with the command
260  *
261  * @param[in] cmd to free. Frees any redis results associated with the command.
262  */
264 {
265  //if (cmd->result) fr_redis_reply_free(&cmd->result);
266 
267  return 0;
268 }
269 
271 {
272  return cmd->result;
273 }
274 
275 /** Add a preformatted/expanded command to the command set
276  *
277  * The command must either be entirely static, or parented by the command set.
278  *
279  * @note Caller should disallow "SUBSCRIBE" et al, if they're not appropriate.
280  * As subscribing to a stream where we're not expecting it would break
281  * things, badly.
282  *
283  * @param[in] cmds Command set to add command to.
284  * @param[in] cmd_str A fully expanded/formatted command to send to redis.
285  * Must be static, or have the same lifetime as the
286  * command set (allocated with the command set as the parent).
287  * @param[in] cmd_len Length of the command.
288  * @return
289  * - FR_REDIS_PIPELINE_BAD_CMDS if a bad command sequence is enqueued.
290  * - FR_REDIS_PIPELINE_OK if command was enqueued successfully.
291  */
293  char const *cmd_str, size_t cmd_len)
294 {
295  request_t *request = cmds->request;
296  fr_redis_command_t *cmd;
298 
299  /*
300  * Transaction sanity checks.
301  *
302  * Because commands from many different requests share the same connection
303  * we need to ensure that transaction blocks aren't left dangling and
304  * that the commands are all in the right order.
305  *
306  * We try very hard to do this without incurring a performance penalty
307  * for non-transactional commands.
308  */
309  switch (tolower(cmd_str[0])) {
310  case 'm':
311  if (tolower(cmd_str[1] != 'u')) break;
312  if (strncasecmp(cmd_str, "multi", sizeof("multi") - 1) != 0) break;
313  /*
314  * There should only ever be a difference of
315  * 1 between txn starts and txn ends.
316  */
317  if ((cmds->txn_end < cmds->txn_start) && ((cmds->txn_start - cmds->txn_end) > 1)) {
318  ROPTIONAL(ERROR, REDEBUG, "Too many consecutive \"MULTI\" commands");
320  }
321  /*
322  * If we have a watch before the MULTI,
323  * that's marked as the start of the transaction
324  * block.
325  */
327  cmds->txn_start++; /* Yes MULTI increments start, not WATCH */
328  break;
329 
330  case 'e':
331  if (tolower(cmd_str[1] != 'e')) break;
332  if (strncasecmp(cmd_str, "exec", sizeof("exec") - 1) != 0) break;
333  goto txn_end;
334 
335  /*
336  * It's useful to allow discard as it allows command syntax checks
337  * to be performed against the REDIS server without actually
338  * executing the commands.
339  */
340  case 'd':
341  if (tolower(cmd_str[1] != 'i')) break;
342  if (strncasecmp(cmd_str, "discard", sizeof("discard") - 1) != 0) break;
343  txn_end:
344  if (cmds->txn_start <= cmds->txn_end) {
345  ROPTIONAL(ERROR, REDEBUG, "Transaction not started, missing \"MULTI\" command");
347  }
349  cmds->txn_end++;
350  break;
351 
352  case 'w':
353  if (tolower(cmd_str[1] != 'a')) break;
354  if (strncasecmp(cmd_str, "watch", sizeof("watch") - 1) != 0) break;
355  if (cmds->txn_watch) {
356  ROPTIONAL(ERROR, REDEBUG, "Too many consecutive \"WATCH\" commands");
358  }
359  if (cmds->txn_start > cmds->txn_end) {
360  ROPTIONAL(ERROR, REDEBUG, "\"WATCH\" can only be used before \"MULTI\"");
362  }
363  FALL_THROUGH;
364 
365  default:
366  break;
367  }
368 
369  MEM(cmd = talloc_zero(cmds, fr_redis_command_t));
370  talloc_set_destructor(cmd, _redis_command_free);
371  cmd->cmds = cmds;
372  cmd->type = type;
373  cmd->str = cmd_str;
374  cmd->len = cmd_len;
375  fr_dlist_insert_tail(&cmds->pending, cmd);
376 
377  return FR_REDIS_PIPELINE_OK;
378 }
379 
380 /** Enqueue a command set on a specific trunk
381  *
382  * The command set may be passed around several trunks before it is complete.
383  * This is to allow it to follow MOVED and ASK responses.
384  *
385  * @param[in] rtrunk to enqueue command set on.
386  * @param[in] cmds Command set to enqueue.
387  * @return
388  * - FR_REDIS_PIPELINE_OK if commands were immediately enqueued or placed in the backlog.
389  * - FR_REDIS_PIPELINE_DST_UNAVAILABLE if the REDIS host is unreachable.
390  * - FR_REDIS_PIPELINE_FAIL any other general error.
391  */
393 {
394  if (cmds->txn_start != cmds->txn_end) {
395  ERROR("Refusing to enqueue - Unbalanced transaction start/stop commands");
397  }
398 
399  switch (fr_trunk_request_enqueue(&cmds->treq, rtrunk->trunk, cmds->request, cmds, cmds->rctx)) {
400  case FR_TRUNK_ENQUEUE_OK:
402  return FR_REDIS_PIPELINE_OK;
403 
406 
407  default:
408  return FR_REDIS_PIPELINE_FAIL;
409  }
410 }
411 
412 /** Callback for for receiving Redis replies
413  *
414  * This is called by hiredis for each response is receives. privData is set to the
415  * fr_command_set
416  *
417  * @note Called only from hiredis, not the trunk itself.
418  *
419  * @param[in] ac The async context the command was enqueued on.
420  * @param[in] vreply redisReply containing the result of the command.
421  * @param[in] privdata fr_redis_command_t that was sent to the Redis server.
422  * The fr_redis_command_t contains a pointer to the
423  * fr_redis_command_set_t which holds the treq which
424  * we use to signal that we have responses for all
425  * commands.
426  */
427 static void _redis_pipeline_demux(struct redisAsyncContext *ac, void *vreply, void *privdata)
428 {
429  fr_redis_command_t *cmd;
431  fr_connection_t *conn = talloc_get_type_abort(ac->ev.data, fr_connection_t);
432  fr_redis_handle_t *h = talloc_get_type_abort(conn->h, fr_redis_handle_t);
433  redisReply *reply = vreply;
434  /*
435  * First check if we should ignore the response
436  */
438  DEBUG4("Ignoring response with SQN %"PRIu64, (h->rsp_sqn - 1)); /* Already incremented */
439  fr_redis_reply_free((redisReply **)&reply);
440  return;
441  }
442 
443  /*
444  * FIXME - Need to check TRYAGAIN, MOVED etc...
445  * I guess we might want to wait for the end of
446  * the command set to do that.
447  */
448  cmd = talloc_get_type_abort(privdata, fr_redis_command_t);
449  cmds = cmd->cmds;
450  cmd->result = reply;
451 
452  fr_dlist_remove(&cmds->sent, cmd);
453  fr_dlist_insert_tail(&cmds->completed, cmd);
454 
455  /*
456  * Check is the command set is complete,
457  * and if it is, tell the trunk the treq
458  * is complete.
459  */
460  if ((fr_dlist_num_elements(&cmds->pending) == 0) &&
462 }
463 
465  fr_connection_conf_t const *conf,
466  char const *log_prefix, void *uctx)
467 {
468  fr_redis_trunk_t *rtrunk = talloc_get_type_abort(uctx, fr_redis_trunk_t);
469 
470  return fr_redis_connection_alloc(tconn, el, conf, rtrunk->io_conf, log_prefix);
471 }
472 
473 /** Enqueue one or more command sets onto a redis handle
474  *
475  * Because the trunk is in always writable mode, _redis_pipeline_mux
476  * will be called any time fr_trunk_request_enqueue is called, so there'll only
477  * ever be one command to dequeue.
478  *
479  * @param[in] tconn Trunk connection holding the commands to enqueue.
480  * @param[in] conn Connection handle containing the fr_redis_handle_t.
481  * @param[in] uctx fr_redis_cluster_t. Unused.
482  */
483 static void _redis_pipeline_mux(fr_trunk_connection_t *tconn, fr_connection_t *conn, UNUSED void *uctx)
484 {
485  fr_trunk_request_t *treq;
487  fr_redis_command_t *cmd;
488  fr_redis_handle_t *h = talloc_get_type_abort(conn->h, fr_redis_handle_t);
489  request_t *request;
490 
491  treq = fr_trunk_connection_pop_request(&request, (void *)&cmds, NULL, tconn);
492  while ((cmd = fr_dlist_head(&cmds->pending))) {
493  /*
494  * If this fails it probably means the connection
495  * is disconnecting, but if that's happening then
496  * we shouldn't be enqueueing new requests?
497  */
498  if (unlikely(redisAsyncCommand(h->ac, _redis_pipeline_demux, cmd, "%s", cmd->str) != REDIS_OK)) {
499  ROPTIONAL(ERROR, REDEBUG, "Unexpected error queueing REDIS command");
500 
501  while ((cmd = fr_dlist_head(&cmds->sent))) {
503  fr_dlist_remove(&cmds->sent, cmd);
504  fr_dlist_insert_tail(&cmds->pending, cmd);
505  }
507  return;
508  }
510  fr_dlist_remove(&cmds->pending, cmd);
511  fr_dlist_insert_tail(&cmds->sent, cmd);
512  }
514 }
515 
516 /** Deal with cancellation of sent requests
517  *
518  * We can't actually signal redis to not process the request, so depending
519  * on why the commands were cancelled, we either tell the handle to ignore
520  * them, or move them back into the pending list.
521  */
523  fr_trunk_cancel_reason_t reason, UNUSED void *uctx)
524 {
525  fr_redis_command_set_t *cmds = talloc_get_type_abort(preq, fr_redis_command_set_t);
526  fr_redis_handle_t *h = conn->h;
527 
528  /*
529  * How we cancel is very different depending
530  * on _WHY_ we're cancelling.
531  */
532  switch (reason) {
533  /*
534  * Cancel is only called for requests that
535  * have been sent, and only when the connection
536  * is about to be closed for some reason.
537  *
538  * We don't need to tell the handle to ignore
539  * the responses, we just need to get the
540  * command set back into the correct state for
541  * execution by another handle.
542  */
544  fr_dlist_move(&cmds->pending, &cmds->sent);
545  return;
546 
547  /*
548  * If the request was cancelled due to a signal
549  * we'll have a response coming back for a
550  * request, pctx and rctx that no longer exist.
551  * Tell the handle to signal that the response
552  * should be ignored when it's received.
553  *
554  * Free will take care of cleaning up the
555  * pending commands.
556  */
558  {
559  fr_redis_command_t *cmd;
560 
561  for (cmd = fr_dlist_head(&cmds->sent);
562  cmd;
563  cmd = fr_dlist_next(&cmds->sent, cmd)) {
565  }
566  }
567 
569  fr_assert(0);
570  return;
571  }
572 }
573 
574 /** Signal the API client that we got a complete set of responses to a command set
575  *
576  */
577 static void _redis_pipeline_command_set_complete(UNUSED request_t *request, void *preq,
578  UNUSED void *rctx, UNUSED void *uctx)
579 {
580  fr_redis_command_set_t *cmds = talloc_get_type_abort(preq, fr_redis_command_set_t);
581 
582  if (cmds->complete) cmds->complete(cmds->request, &cmds->completed, cmds->rctx);
583 }
584 
585 /** Signal the API client that we failed enqueuing the commands
586  *
587  */
588 static void _redis_pipeline_command_set_fail(UNUSED request_t *request, void *preq,
589  UNUSED void *rctx, UNUSED void *uctx)
590 {
591  fr_redis_command_set_t *cmds = talloc_get_type_abort(preq, fr_redis_command_set_t);
592 
593  if (cmds->fail) cmds->fail(cmds->request, &cmds->completed, cmds->rctx);
594 }
595 
596 /** Free the command set
597  *
598  */
599 static void _redis_pipeline_command_set_free(UNUSED request_t *request, void *preq,
600  UNUSED void *uctx)
601 {
602  fr_redis_command_set_t *cmds = talloc_get_type_abort(preq, fr_redis_command_set_t);
603 
604  talloc_free(cmds);
605 }
606 
607 /** Allocate a new trunk
608  *
609  * @param[in] cluster_thread to allocate the trunk for.
610  * @param[in] io_conf Describing the connection to a single REDIS host.
611  * @return
612  * - On success, a new fr_redis_trunk_t which can be used for pipelining commands.
613  * - NULL on failure.
614  */
616 {
617  fr_redis_trunk_t *rtrunk;
618  fr_trunk_io_funcs_t io_funcs = {
620  .request_mux = _redis_pipeline_mux,
621  /* demux called directly by hiredis */
622  .request_cancel = _redis_pipeline_command_set_cancel,
623  .request_complete = _redis_pipeline_command_set_complete,
624  .request_fail = _redis_pipeline_command_set_fail,
625  .request_free = _redis_pipeline_command_set_free
626  };
627 
628  MEM(rtrunk = talloc_zero(cluster_thread, fr_redis_trunk_t));
629  rtrunk->io_conf = io_conf;
630  rtrunk->trunk = fr_trunk_alloc(rtrunk, cluster_thread->el,
631  &io_funcs, cluster_thread->tconf, cluster_thread->log_prefix, rtrunk,
632  cluster_thread->delay_start);
633  if (!rtrunk->trunk) {
634  talloc_free(rtrunk);
635  return NULL;
636  }
637 
638  return rtrunk;
639 }
640 
641 /** Allocate per-thread, per-cluster instance
642  *
643  * This structure represents all the connections for a given thread for a given cluster.
644  * The structures holds the trunk connections to talk to each cluster member.
645  *
646  */
648 {
649  fr_redis_cluster_thread_t *cluster_thread;
650  fr_trunk_conf_t *our_tconf;
651 
652  MEM(cluster_thread = talloc_zero(ctx, fr_redis_cluster_thread_t));
653  MEM(our_tconf = talloc_memdup(cluster_thread, tconf, sizeof(*tconf)));
654  our_tconf->always_writable = true;
655 
656  cluster_thread->el = el;
657  cluster_thread->tconf = our_tconf;
658 
659  return cluster_thread;
660 }
661 
662 
#define fr_atexit_thread_local(_name, _free, _uctx)
Definition: atexit.h:221
#define FALL_THROUGH
clang 10 doesn't recognised the FALL-THROUGH comment anymore
Definition: build.h:320
#define unlikely(_x)
Definition: build.h:378
#define UNUSED
Definition: build.h:313
#define ERROR(fmt,...)
Definition: dhcpclient.c:41
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition: dlist.h:260
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Definition: dlist.h:555
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition: dlist.h:163
static void fr_dlist_entry_unlink(fr_dlist_t *entry)
Remove an item from the dlist when we don't have access to the head.
Definition: dlist.h:146
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition: dlist.h:939
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition: dlist.h:486
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition: dlist.h:378
static int fr_dlist_move(fr_dlist_head_t *list_dst, fr_dlist_head_t *list_src)
Merge two lists, inserting the source at the tail of the destination.
Definition: dlist.h:763
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition: dlist.h:638
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition: dlist.h:275
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
Definition: dlist.h:338
static void fr_dlist_entry_init(fr_dlist_t *entry)
Initialise a linked list without metadata.
Definition: dlist.h:138
Head of a doubly linked list.
Definition: dlist.h:51
Entry in a doubly linked list.
Definition: dlist.h:41
fr_connection_t * fr_redis_connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, fr_connection_conf_t const *conn_conf, fr_redis_io_conf_t const *io_conf, char const *log_prefix)
Allocate an async redis I/O connection.
Definition: io.c:421
fr_redis_sqn_t rsp_sqn
Current redis response number.
Definition: io.h:85
redisAsyncContext * ac
Async handle for hiredis.
Definition: io.h:74
static void fr_redis_connection_ignore_response(fr_redis_handle_t *h, fr_redis_sqn_t sqn)
Ignore a response with a specific sequence number.
Definition: io.h:106
static bool fr_redis_connection_process_response(fr_redis_handle_t *h)
Update the response sequence number and check if we should ignore the response.
Definition: io.h:124
static fr_redis_sqn_t fr_redis_connection_sent_request(fr_redis_handle_t *h)
Tell the handle we sent a command, and get the SQN that command was assigned.
Definition: io.h:96
Store I/O state.
Definition: io.h:66
#define ROPTIONAL(_l_request, _l_global, _fmt,...)
Use different logging functions depending on whether request is NULL or not.
Definition: log.h:528
#define DEBUG4(_fmt,...)
Definition: log.h:267
talloc_free(reap)
Stores all information relating to an event list.
Definition: event.c:411
unsigned short uint16_t
Definition: merged_model.c:31
unsigned char uint8_t
Definition: merged_model.c:30
int strncasecmp(char *s1, char *s2, int n)
Definition: missing.c:36
Function prototypes and datatypes for the REST (HTTP) transport.
request_t * request
Request this commands set is associated with (if any).
Definition: pipeline.c:109
static int _command_set_free_list_free_on_exit(void *arg)
Free any free requests when the thread is joined.
Definition: pipeline.c:155
fr_redis_cluster_thread_t * cluster
Cluster this trunk belongs to.
Definition: pipeline.c:149
fr_dlist_head_t sent
Commands sent.
Definition: pipeline.c:94
uint16_t txn_start
Number of times a transaction block was started in this command set.
Definition: pipeline.c:136
fr_redis_io_conf_t const * io_conf
Redis I/O configuration.
Definition: pipeline.c:145
fr_redis_command_set_t * fr_redis_command_set_alloc(TALLOC_CTX *ctx, request_t *request, fr_redis_command_set_complete_t complete, fr_redis_command_set_fail_t fail, void *rctx)
Allocate a new command set.
Definition: pipeline.c:206
fr_dlist_t entry
Entry in the command buffer.
Definition: pipeline.c:69
fr_trunk_request_t * treq
Trunk request this command set is associated with.
Definition: pipeline.c:108
#define COMMAND_PRE_ALLOC_COUNT
void * rctx
Resume context to write results to.
Definition: pipeline.c:110
uint64_t sqn
The sequence number of the command.
Definition: pipeline.c:76
fr_redis_command_set_complete_t complete
Notify the creator of the command set that the command set has executed to to completion.
Definition: pipeline.c:116
fr_redis_command_set_fail_t fail
Notify the creator of the command set that the command set failed to execute to completion.
Definition: pipeline.c:121
char * log_prefix
Common log prefix to use for all cluster related messages.
Definition: pipeline.c:42
uint16_t txn_end
The number of times a transaction block ended in this command set.
Definition: pipeline.c:138
fr_redis_pipeline_status_t redis_command_set_enqueue(fr_redis_trunk_t *rtrunk, fr_redis_command_set_t *cmds)
Enqueue a command set on a specific trunk.
Definition: pipeline.c:392
static int _redis_command_free(fr_redis_command_t *cmd)
Free any result associated with the command.
Definition: pipeline.c:263
static void _redis_pipeline_demux(struct redisAsyncContext *ac, void *vreply, void *privdata)
Callback for for receiving Redis replies.
Definition: pipeline.c:427
fr_redis_cluster_thread_t * fr_redis_cluster_thread_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, fr_trunk_conf_t const *tconf)
Allocate per-thread, per-cluster instance.
Definition: pipeline.c:647
fr_event_list_t * el
Definition: pipeline.c:40
#define COMMAND_PRE_ALLOC_LEN
char const * str
The command string.
Definition: pipeline.c:73
fr_trunk_conf_t const * tconf
Configuration for all trunks in the cluster.
Definition: pipeline.c:41
bool delay_start
Prevent connections from spawning immediately.
Definition: pipeline.c:44
redisReply * fr_redis_command_get_result(fr_redis_command_t *cmd)
Definition: pipeline.c:270
fr_trunk_t * trunk
Trunk containing all the connections to a specific host.
Definition: pipeline.c:147
static void _redis_pipeline_command_set_free(UNUSED request_t *request, void *preq, UNUSED void *uctx)
Free the command set.
Definition: pipeline.c:599
static _Thread_local fr_dlist_head_t * command_set_free_list
The thread local free list.
Definition: pipeline.c:51
redisReply * result
The result from the REDIS server.
Definition: pipeline.c:80
static void _redis_pipeline_command_set_cancel(fr_connection_t *conn, UNUSED fr_trunk_request_t *treq, void *preq, fr_trunk_cancel_reason_t reason, UNUSED void *uctx)
Deal with cancellation of sent requests.
Definition: pipeline.c:522
fr_redis_command_type_t type
Redis command type.
Definition: pipeline.c:71
static int _redis_command_set_free(fr_redis_command_set_t *cmds)
Free a command set.
Definition: pipeline.c:170
fr_dlist_t entry
Definition: pipeline.c:88
static void _redis_pipeline_command_set_complete(UNUSED request_t *request, void *preq, UNUSED void *rctx, UNUSED void *uctx)
Signal the API client that we got a complete set of responses to a command set.
Definition: pipeline.c:577
static void _redis_pipeline_command_set_fail(UNUSED request_t *request, void *preq, UNUSED void *rctx, UNUSED void *uctx)
Signal the API client that we failed enqueuing the commands.
Definition: pipeline.c:588
bool txn_watch
Transaction was started with a watch statement.
Definition: pipeline.c:135
fr_redis_pipeline_status_t fr_redis_command_preformatted_add(fr_redis_command_set_t *cmds, char const *cmd_str, size_t cmd_len)
Add a preformatted/expanded command to the command set.
Definition: pipeline.c:292
static void _redis_pipeline_mux(fr_trunk_connection_t *tconn, fr_connection_t *conn, UNUSED void *uctx)
Enqueue one or more command sets onto a redis handle.
Definition: pipeline.c:483
fr_dlist_head_t completed
Commands complete with replies.
Definition: pipeline.c:95
fr_redis_command_type_t
Definition: pipeline.c:53
@ FR_REDIS_COMMAND_TRANSACTION_START
Start of a transaction block.
Definition: pipeline.c:55
@ FR_REDIS_COMMAND_NORMAL
A normal, non-transactional command.
Definition: pipeline.c:54
@ FR_REDIS_COMMAND_TRANSACTION_END
End of a transaction block.
Definition: pipeline.c:58
uint8_t redirected
How many times this command set was redirected.
Definition: pipeline.c:98
size_t len
Length of the command string.
Definition: pipeline.c:74
fr_redis_trunk_t * fr_redis_trunk_alloc(fr_redis_cluster_thread_t *cluster_thread, fr_redis_io_conf_t const *io_conf)
Allocate a new trunk.
Definition: pipeline.c:615
fr_dlist_head_t pending
Commands yet to be sent.
Definition: pipeline.c:93
fr_redis_command_set_t * cmds
Command set this entry belongs to.
Definition: pipeline.c:68
static fr_connection_t * _redis_pipeline_connection_alloc(fr_trunk_connection_t *tconn, fr_event_list_t *el, fr_connection_conf_t const *conf, char const *log_prefix, void *uctx)
Definition: pipeline.c:464
Thread local state for a cluster.
Definition: pipeline.c:39
Represents a single command.
Definition: pipeline.c:67
Represents a collection of pipelined commands.
Definition: pipeline.c:87
Redis asynchronous command pipelining.
void(* fr_redis_command_set_complete_t)(request_t *request, fr_dlist_head_t *completed, void *rctx)
Do something meaningful with the replies to the commands previously issued.
Definition: pipeline.h:58
void(* fr_redis_command_set_fail_t)(request_t *request, fr_dlist_head_t *completed, void *rctx)
Write a failure result to the rctx so that the module is aware that the request failed.
Definition: pipeline.h:64
fr_redis_pipeline_status_t
Definition: pipeline.h:41
@ FR_REDIS_PIPELINE_OK
No failure.
Definition: pipeline.h:42
@ FR_REDIS_PIPELINE_BAD_CMDS
Malformed command set.
Definition: pipeline.h:43
@ FR_REDIS_PIPELINE_DST_UNAVAILABLE
Cluster or host is down.
Definition: pipeline.h:44
@ FR_REDIS_PIPELINE_FAIL
Generic failure.
Definition: pipeline.h:46
#define REDEBUG(fmt,...)
Definition: radclient.h:52
static rs_t * conf
Definition: radsniff.c:53
static void fr_redis_reply_free(redisReply **reply)
Wrap freeReplyObject so we consistently check for NULL pointers.
Definition: base.h:64
fr_assert(0)
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
fr_aka_sim_id_type_t type
int talloc_link_ctx(TALLOC_CTX *parent, TALLOC_CTX *child)
Link two different parent and child contexts, so the child is freed before the parent.
Definition: talloc.c:167
#define talloc_zero_pooled_object(_ctx, _type, _num_subobjects, _total_subobjects_size)
Definition: talloc.h:165
void fr_trunk_request_signal_sent(fr_trunk_request_t *treq)
Signal that the request was written to a connection successfully.
Definition: trunk.c:1973
fr_trunk_enqueue_t fr_trunk_request_enqueue(fr_trunk_request_t **treq_out, fr_trunk_t *trunk, request_t *request, void *preq, void *rctx)
Enqueue a request that needs data written to the trunk.
Definition: trunk.c:2481
void fr_trunk_request_signal_complete(fr_trunk_request_t *treq)
Signal that a trunk request is complete.
Definition: trunk.c:1995
fr_trunk_t * fr_trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, fr_trunk_io_funcs_t const *funcs, fr_trunk_conf_t const *conf, char const *log_prefix, void const *uctx, bool delay_start)
Allocate a new collection of connections.
Definition: trunk.c:4767
int fr_trunk_connection_pop_request(fr_trunk_request_t **treq_out, fr_trunk_connection_t *tconn)
Pop a request off a connection's pending queue.
Definition: trunk.c:3756
void fr_trunk_request_signal_fail(fr_trunk_request_t *treq)
Signal that a trunk request failed.
Definition: trunk.c:2027
Associates request queues with a connection.
Definition: trunk.c:127
Wraps a normal request.
Definition: trunk.c:97
Main trunk management handle.
Definition: trunk.c:189
fr_trunk_cancel_reason_t
Reasons for a request being cancelled.
Definition: trunk.h:55
@ FR_TRUNK_CANCEL_REASON_SIGNAL
Request cancelled due to a signal.
Definition: trunk.h:57
@ FR_TRUNK_CANCEL_REASON_MOVE
Request cancelled because it's being moved.
Definition: trunk.h:58
@ FR_TRUNK_CANCEL_REASON_NONE
Request has not been cancelled.
Definition: trunk.h:56
fr_trunk_connection_alloc_t connection_alloc
Allocate a new fr_connection_t.
Definition: trunk.h:712
bool always_writable
Set to true if our ability to write requests to a connection handle is not dependent on the state of ...
Definition: trunk.h:256
@ FR_TRUNK_ENQUEUE_IN_BACKLOG
Request should be enqueued in backlog.
Definition: trunk.h:149
@ FR_TRUNK_ENQUEUE_OK
Operation was successful.
Definition: trunk.h:150
@ FR_TRUNK_ENQUEUE_DST_UNAVAILABLE
Destination is down.
Definition: trunk.h:153
Common configuration parameters for a trunk.
Definition: trunk.h:213
I/O functions to pass to fr_trunk_alloc.
Definition: trunk.h:711
static fr_event_list_t * el