The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
io.c
Go to the documentation of this file.
1 /*
2  * This program is is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or (at
5  * your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: 476aefdc5a50be87eb41c4af3de90d63bcfa9a3b $
19  * @file lib/redis/io.c
20  * @brief Common functions for interacting with Redis via hiredis
21  *
22  * @copyright 2019 The FreeRADIUS server project
23  * @copyright 2019 Network RADIUS SAS (legal@networkradius.com)
24  *
25  * @author Arran Cudbard-Bell (a.cudbardb@freeradius.org)
26  */
27 
28 #include <freeradius-devel/redis/io.h>
29 #include <freeradius-devel/util/debug.h>
30 
31 #include <hiredis/async.h>
32 
33 /** Called by hiredis to indicate the connection is dead
34  *
35  */
36 static void _redis_disconnected(redisAsyncContext const *ac, UNUSED int status)
37 {
38  connection_t *conn = talloc_get_type_abort(ac->data, connection_t);
39  fr_redis_handle_t *h = conn->h;
40 
41  /*
42  * redisAsyncFree was called with a live
43  * connection, but inside the talloc
44  * destructor of the fr_redis_handle_t.
45  *
46  * Don't signal the connection state
47  * machine that it needs reconnecting,
48  * the connection is being destroyed.
49  */
50  if (h->ignore_disconnect_cb) return;
51 
52  DEBUG4("Signalled by hiredis, connection disconnected");
53 
55 }
56 
57 /** Called by hiredis to indicate the connection is live
58  *
59  */
60 static void _redis_connected(redisAsyncContext const *ac, UNUSED int status)
61 {
62  connection_t *conn = talloc_get_type_abort(ac->data, connection_t);
63 
64  DEBUG4("Signalled by hiredis, connection is open");
65 
67 }
68 
69 /** Redis FD became readable
70  *
71  */
72 static void _redis_io_service_readable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
73 {
75  fr_redis_handle_t *h = conn->h;
76 
77  DEBUG4("redis handle %p - FD %i now readable", h, fd);
78 
79  redisAsyncHandleRead(h->ac);
80 }
81 
82 /** Redis FD became writable
83  *
84  */
85 static void _redis_io_service_writable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
86 {
88  fr_redis_handle_t *h = conn->h;
89 
90  DEBUG4("redis handle %p - FD %i now writable", h, fd);
91 
92  redisAsyncHandleWrite(h->ac);
93 }
94 
95 /** Redis FD errored - Automatically removes registered events
96  *
97  */
98 static void _redis_io_service_errored(UNUSED fr_event_list_t *el, int fd, UNUSED int flags,
99  int fd_errno, void *uctx)
100 {
101  connection_t *conn = talloc_get_type_abort(uctx, connection_t);
102  fr_redis_handle_t *h = conn->h;
103 
104  DEBUG4("redis handle %p - FD %i errored: %s", h, fd, fr_syserror(fd_errno));
105 
106  /*
107  * Connection state machine will handle reconnecting
108  */
110 }
111 
112 /** Deal with the method hiredis uses to register/unregister interest in a file descriptor
113  *
114  */
115 static void _redis_io_common(connection_t *conn, fr_redis_handle_t *h, bool read, bool write)
116 {
117  redisContext *c = &(h->ac->c);
118  fr_event_list_t *el = conn->el;
119 
120  /*
121  * hiredis doesn't even attempt to dedup registration
122  * requests *sigh*...
123  */
124  if ((h->read_set == read) && (h->write_set == write)) return;
125 
126  if (!read && !write) {
127  DEBUG4("redis handle %p - De-registering FD %i", h, c->fd);
128 
129  if (fr_event_fd_delete(el, c->fd, FR_EVENT_FILTER_IO) < 0) {
130  PERROR("redis handle %p - De-registration failed for FD %i", h, c->fd);
131  }
132  return;
133  }
134 
135  DEBUG4("redis handle %p - Registered for %s%serror events on FD %i",
136  h, read ? "read+" : "", write ? "write+" : "", c->fd);
137 
138  if (fr_event_fd_insert(h, NULL, el, c->fd,
139  read ? _redis_io_service_readable : NULL,
140  write ? _redis_io_service_writable : NULL,
142  conn) < 0) {
143  PERROR("redis handle %p - Registration failed for %s%serror events on FD %i",
144  h, read ? "read+" : "", write ? "write+" : "", c->fd);
145  return;
146  }
147 
148  h->read_set = read;
149  h->write_set = write;
150 }
151 
152 /** Register FD for reads
153  *
154  */
155 static void _redis_io_add_read(void *uctx)
156 {
157  connection_t *conn = talloc_get_type_abort(uctx, connection_t);
158  fr_redis_handle_t *h = conn->h;
159 
160  _redis_io_common(conn, h, true, h->write_set);
161 }
162 
163 /** De-register FD for reads
164  *
165  */
166 static void _redis_io_del_read(void *uctx)
167 {
168  connection_t *conn = talloc_get_type_abort(uctx, connection_t);
169  fr_redis_handle_t *h = conn->h;
170 
171  _redis_io_common(conn, h, false, h->write_set);
172 }
173 
174 /** Register FD for writes
175  *
176  */
177 static void _redis_io_add_write(void *uctx)
178 {
179  connection_t *conn = talloc_get_type_abort(uctx, connection_t);
180  fr_redis_handle_t *h = conn->h;
181 
182  _redis_io_common(conn, h, h->read_set, true);
183 }
184 
185 /** De-register FD for writes
186  *
187  */
188 static void _redis_io_del_write(void *uctx)
189 {
190  connection_t *conn = talloc_get_type_abort(uctx, connection_t);
191  fr_redis_handle_t *h = conn->h;
192 
193  _redis_io_common(conn, h, h->read_set, false);
194 }
195 
196 #ifdef HAVE_REDIS_TIMEOUT
197 /** Connection timer expired
198  *
199  */
200 static void _redis_io_service_timer_expired(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
201 {
203  fr_redis_handle_t *h = conn->h;
204 
205  DEBUG4("redis handle %p - Timeout", h);
206 
207  redisAsyncHandleTimeout(h->ac);
208 }
209 
210 /** Modify the connection I/O timer
211  *
212  */
213 static void _redis_io_timer_modify(void *uctx, struct timeval tv)
214 {
215  connection_t *conn = talloc_get_type_abort(uctx, connection_t);
216  fr_redis_handle_t *h = conn->h;
218 
220 
221  DEBUG4("redis handle %p - Timeout in %pV seconds", h, fr_box_time_delta(timeout));
222 
223  if (fr_event_timer_in(h, conn->el, &h->timer,
224  timeout, _redis_io_service_timer_expired, conn) < 0) {
225  PERROR("redis timeout %p - Failed adding timeout", h);
226  }
227 }
228 #endif
229 
230 /** Handle freeing the redisAsyncContext
231  *
232  * delRead and delWrite don't seem to be called when the redisAsyncContext is freed
233  *
234  * As the IO events must be removed from the event loop *before* the FD is closed
235  * and as the IO events will only be automatically de-registered when when the
236  * fr_redis_handle_t is freed.
237  *
238  * Unfortunately the destructor for the fr_redis_handle_t will be run before
239  * the IO events are de-registered, which'll free the redisAsycCtx, which'll close
240  * the FD.
241  *
242  * This means there'd be a brief period of time between the FD is closed, and
243  * it being removed from the event loop.
244  *
245  * We use the cleanup callback (which is called before the FD is closed) to remove
246  * the events now, and ensure there's no chance of issues.
247  */
248 static void _redis_io_free(void *uctx)
249 {
250  connection_t *conn = talloc_get_type_abort(uctx, connection_t);
251  fr_redis_handle_t *h = conn->h;
252 
253  DEBUG4("redis handle %p - Freed", h);
254 
255  _redis_io_common(conn, h, false, false);
256 }
257 
258 /** Configures async I/O callbacks for an existing redisAsyncContext
259  *
260  */
261 static int fr_redis_io_setup(redisAsyncContext *ac, connection_t const *conn)
262 {
263  if (ac->ev.data != NULL) return REDIS_ERR;
264 
265  ac->ev.addRead = _redis_io_add_read;
266  ac->ev.delRead = _redis_io_del_read;
267  ac->ev.addWrite = _redis_io_add_write;
268  ac->ev.delWrite = _redis_io_del_write;
269 #ifdef HAVE_REDIS_TIMEOUT
270  ac->ev.scheduleTimer = _redis_io_timer_modify;
271 #endif
272  ac->ev.cleanup = _redis_io_free;
273  memcpy(&ac->ev.data, &conn, sizeof(ac->ev.data));
274 
275  return REDIS_OK;
276 }
277 
278 /** Free the redis async context when the handle is freed
279  *
280  */
282 {
283  /*
284  * Don't fire the reconnect callback if we're
285  * freeing the handle.
286  */
287  h->ignore_disconnect_cb = true;
288  if (h->ac) redisAsyncFree(h->ac);
289 
290  return 0;
291 }
292 
293 /** Callback for the initialise state
294  *
295  * Should attempt to open a non-blocking connection and return it in h_out.
296  *
297  * @param[out] h_out Where to write the new handle
298  * @param[in] conn This connection. Opaque, should only be used for
299  * signalling the connection state machine.
300  * @param[in] uctx User context.
301  * @return
302  * - #CONNECTION_STATE_CONNECTING if a file descriptor was successfully created.
303  * - #CONNECTION_STATE_FAILED if we could not open a valid handle.
304  */
305 CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function*/
306 static connection_state_t _redis_io_connection_init(void **h_out, connection_t *conn, void *uctx)
307 {
309  char const *host = conf->hostname;
310  uint16_t port = conf->port;
312  int ret;
313 
314  /*
315  * Allocate a structure to wrap the
316  * redis async context.
317  */
318  MEM(h = talloc_zero(conn, fr_redis_handle_t));
319  talloc_set_destructor(h, _redis_handle_free);
320 
321  h->ac = redisAsyncConnect(host, port);
322  if (!h->ac) {
323  ERROR("Failed allocating handle for %s:%u", host, port);
325  }
326 
327  if (h->ac->err) {
328  ERROR("Failed allocating handle for %s:%u: %s", host, port, h->ac->errstr);
329  error:
330  redisAsyncFree(h->ac);
332  }
333 
334  /*
335  * Store the connection in private data,
336  * so we can use it for signalling.
337  *
338  * Comments in the redis src indicate
339  * it doesn't mess with this.
340  */
341  memcpy(&h->ac->data, &conn, sizeof(h->ac->data));
342 
343  /*
344  * Handle has to be associated with the
345  * conn in case I/O handlers want to get
346  * at it.
347  */
348  *h_out = h;
349 
350  /*
351  * Install the I/O service functions
352  *
353  * Event library must be set first
354  * before calling redisAsyncSetConnectCallback
355  * as just setting the callback triggers
356  * I/O events to be registered.
357  */
358  fr_redis_io_setup(h->ac, conn);
359 
360  /*
361  * Setup callbacks so we're notified
362  * when the connection state changes.
363  *
364  * We then signal the connection state
365  * machine, to let it handle
366  * reconnecting.
367  */
368  ret = redisAsyncSetConnectCallback(h->ac, _redis_connected);
369  if (ret != REDIS_OK) {
370  ERROR("Failed setting connected callback: Error %i", ret);
371  goto error;
372  }
373  if (redisAsyncSetDisconnectCallback(h->ac, _redis_disconnected) != REDIS_OK) {
374  ERROR("Failed setting disconnected callback: Error %i", ret);
375  goto error;
376  }
377 
379 
381 }
382 
383 /** Gracefully signal that the connection should shutdown
384  *
385  */
387 {
388  fr_redis_handle_t *our_h = talloc_get_type_abort(h, fr_redis_handle_t);
389 
390  redisAsyncDisconnect(our_h->ac); /* Should not free the handle */
391 
393 }
394 
395 /** Notification that the connection has errored and must be closed
396  *
397  * This should be used to close the file descriptor. It is assumed
398  * that the file descriptor is invalid after this callback has been executed.
399  *
400  * If this callback does not close the file descriptor, the server will leak
401  * file descriptors.
402  *
403  * @param[in] el to remove event handlers from.
404  * @param[in] h to close.
405  * @param[in] uctx User context.
406  */
408 {
409  fr_redis_handle_t *our_h = talloc_get_type_abort(h, fr_redis_handle_t);
410 
411  /*
412  * The destructor will free the redisAsyncContext
413  * which'll close the connection, after removing
414  * it from the event loop.
415  */
416  talloc_free(our_h);
417 }
418 
419 /** Allocate an async redis I/O connection
420  *
421  */
423  connection_conf_t const *conn_conf, fr_redis_io_conf_t const *io_conf,
424  char const *log_prefix)
425 {
426  connection_t *conn;
427  /*
428  * We don't specify an open callback
429  * as hiredis handles switching over
430  * all the I/O handlers internally
431  * within hireds, and calls us when
432  * the connection is open.
433  */
434  conn = connection_alloc(ctx, el,
436  .init = _redis_io_connection_init,
439  },
440  conn_conf,
441  log_prefix,
442  io_conf);
443  if (!conn) return NULL;
444 
445  return conn;
446 }
447 
448 /** Return the redisAsyncContext associated with the connection
449  *
450  * This is needed to issue commands to the redis server.
451  *
452  * @param[in] conn To retrieve async ctx from.
453  * @return The async ctx.
454  */
456 {
457  fr_redis_handle_t *h = conn->h;
458  return h->ac;
459 }
#define UNUSED
Definition: build.h:313
connection_state_t
Definition: connection.h:45
@ CONNECTION_STATE_FAILED
Connection has failed.
Definition: connection.h:54
@ CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
Definition: connection.h:50
@ CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
Definition: connection.h:53
@ CONNECTION_FAILED
Connection is being reconnected because it failed.
Definition: connection.h:84
Holds a complete set of functions for a connection.
Definition: connection.h:186
fr_dcursor_eval_t void const * uctx
Definition: dcursor.h:546
#define ERROR(fmt,...)
Definition: dhcpclient.c:41
static fr_time_delta_t timeout
Definition: dhcpclient.c:54
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition: dlist.h:275
#define fr_event_fd_insert(...)
Definition: event.h:232
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition: event.h:62
#define fr_event_timer_in(...)
Definition: event.h:255
static void _redis_io_service_errored(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, int fd_errno, void *uctx)
Redis FD errored - Automatically removes registered events.
Definition: io.c:98
static void _redis_io_del_write(void *uctx)
De-register FD for writes.
Definition: io.c:188
static void _redis_io_del_read(void *uctx)
De-register FD for reads.
Definition: io.c:166
static int _redis_handle_free(fr_redis_handle_t *h)
Free the redis async context when the handle is freed.
Definition: io.c:281
static void _redis_io_add_write(void *uctx)
Register FD for writes.
Definition: io.c:177
static connection_state_t _redis_io_connection_shutdown(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
Gracefully signal that the connection should shutdown.
Definition: io.c:386
static void _redis_io_service_writable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Redis FD became writable.
Definition: io.c:85
static void _redis_io_free(void *uctx)
Handle freeing the redisAsyncContext.
Definition: io.c:248
static void _redis_io_service_readable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Redis FD became readable.
Definition: io.c:72
connection_t * fr_redis_connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_conf_t const *conn_conf, fr_redis_io_conf_t const *io_conf, char const *log_prefix)
Allocate an async redis I/O connection.
Definition: io.c:422
static void _redis_connected(redisAsyncContext const *ac, UNUSED int status)
Called by hiredis to indicate the connection is live.
Definition: io.c:60
static void _redis_io_add_read(void *uctx)
Register FD for reads.
Definition: io.c:155
static int fr_redis_io_setup(redisAsyncContext *ac, connection_t const *conn)
Configures async I/O callbacks for an existing redisAsyncContext.
Definition: io.c:261
CC_NO_UBSAN(function)
Callback for the initialise state.
Definition: io.c:305
static void _redis_disconnected(redisAsyncContext const *ac, UNUSED int status)
Called by hiredis to indicate the connection is dead.
Definition: io.c:36
redisAsyncContext * fr_redis_connection_get_async_ctx(connection_t *conn)
Return the redisAsyncContext associated with the connection.
Definition: io.c:455
static void _redis_io_common(connection_t *conn, fr_redis_handle_t *h, bool read, bool write)
Deal with the method hiredis uses to register/unregister interest in a file descriptor.
Definition: io.c:115
static void _redis_io_connection_close(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
Notification that the connection has errored and must be closed.
Definition: io.c:407
bool read_set
We're listening for reads.
Definition: io.h:67
redisAsyncContext * ac
Async handle for hiredis.
Definition: io.h:74
fr_event_timer_t const * timer
Connection timer.
Definition: io.h:71
fr_dlist_head_t ignore
Contains SQNs for responses that should be ignored.
Definition: io.h:76
bool ignore_disconnect_cb
Ensure that redisAsyncFree doesn't cause a callback loop.
Definition: io.h:69
bool write_set
We're listening for writes.
Definition: io.h:68
Store I/O state.
Definition: io.h:66
#define PERROR(_fmt,...)
Definition: log.h:228
#define DEBUG4(_fmt,...)
Definition: log.h:267
talloc_free(reap)
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition: event.c:1260
Stores all information relating to an event list.
Definition: event.c:411
unsigned short uint16_t
Definition: merged_model.c:31
static rs_t * conf
Definition: radsniff.c:53
void connection_signal_reconnect(connection_t *conn, connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
Definition: connection.c:1167
connection_t * connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_funcs_t const *funcs, connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
Definition: connection.c:1512
void connection_signal_connected(connection_t *conn)
Asynchronously signal that the connection is open.
Definition: connection.c:1137
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition: syserror.c:243
#define talloc_get_type_abort_const
Definition: talloc.h:282
static fr_time_delta_t fr_time_delta_from_timeval(struct timeval const *tv)
Definition: time.h:597
A time delta, a difference in time measured in nanoseconds.
Definition: time.h:80
"server local" time.
Definition: time.h:69
static fr_event_list_t * el
#define fr_box_time_delta(_val)
Definition: value.h:343