The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
io.c
Go to the documentation of this file.
1 /*
2  * This program is is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or (at
5  * your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: 8cebe08fc00ccbefc09327065594498db07010c5 $
19  * @file lib/redis/io.c
20  * @brief Common functions for interacting with Redis via hiredis
21  *
22  * @copyright 2019 The FreeRADIUS server project
23  * @copyright 2019 Network RADIUS SAS (legal@networkradius.com)
24  *
25  * @author Arran Cudbard-Bell (a.cudbardb@freeradius.org)
26  */
27 
28 #include <freeradius-devel/redis/io.h>
29 #include <freeradius-devel/util/debug.h>
30 
31 #include <hiredis/async.h>
32 
33 /** Called by hiredis to indicate the connection is dead
34  *
35  */
36 static void _redis_disconnected(redisAsyncContext const *ac, UNUSED int status)
37 {
38  fr_connection_t *conn = talloc_get_type_abort(ac->data, fr_connection_t);
39  fr_redis_handle_t *h = conn->h;
40 
41  /*
42  * redisAsyncFree was called with a live
43  * connection, but inside the talloc
44  * destructor of the fr_redis_handle_t.
45  *
46  * Don't signal the connection state
47  * machine that it needs reconnecting,
48  * the connection is being destroyed.
49  */
50  if (h->ignore_disconnect_cb) return;
51 
52  DEBUG4("Signalled by hiredis, connection disconnected");
53 
55 }
56 
57 /** Called by hiredis to indicate the connection is live
58  *
59  */
60 static void _redis_connected(redisAsyncContext const *ac, UNUSED int status)
61 {
62  fr_connection_t *conn = talloc_get_type_abort(ac->data, fr_connection_t);
63 
64  DEBUG4("Signalled by hiredis, connection is open");
65 
67 }
68 
69 /** Redis FD became readable
70  *
71  */
72 static void _redis_io_service_readable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
73 {
75  fr_redis_handle_t *h = conn->h;
76 
77  DEBUG4("redis handle %p - FD %i now readable", h, fd);
78 
79  redisAsyncHandleRead(h->ac);
80 }
81 
82 /** Redis FD became writable
83  *
84  */
85 static void _redis_io_service_writable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
86 {
88  fr_redis_handle_t *h = conn->h;
89 
90  DEBUG4("redis handle %p - FD %i now writable", h, fd);
91 
92  redisAsyncHandleWrite(h->ac);
93 }
94 
95 /** Redis FD errored - Automatically removes registered events
96  *
97  */
98 static void _redis_io_service_errored(UNUSED fr_event_list_t *el, int fd, UNUSED int flags,
99  int fd_errno, void *uctx)
100 {
101  fr_connection_t *conn = talloc_get_type_abort(uctx, fr_connection_t);
102  fr_redis_handle_t *h = conn->h;
103 
104  DEBUG4("redis handle %p - FD %i errored: %s", h, fd, fr_syserror(fd_errno));
105 
106  /*
107  * Connection state machine will handle reconnecting
108  */
110 }
111 
112 /** Deal with the method hiredis uses to register/unregister interest in a file descriptor
113  *
114  */
115 static void _redis_io_common(fr_connection_t *conn, fr_redis_handle_t *h, bool read, bool write)
116 {
117  redisContext *c = &(h->ac->c);
118  fr_event_list_t *el = conn->el;
119 
120  /*
121  * hiredis doesn't even attempt to dedup registration
122  * requests *sigh*...
123  */
124  if ((h->read_set == read) && (h->write_set == write)) return;
125 
126  if (!read && !write) {
127  DEBUG4("redis handle %p - De-registering FD %i", h, c->fd);
128 
129  if (fr_event_fd_delete(el, c->fd, FR_EVENT_FILTER_IO) < 0) {
130  PERROR("redis handle %p - De-registration failed for FD %i", h, c->fd);
131  }
132  return;
133  }
134 
135  DEBUG4("redis handle %p - Registered for %s%serror events on FD %i",
136  h, read ? "read+" : "", write ? "write+" : "", c->fd);
137 
138  if (fr_event_fd_insert(h, NULL, el, c->fd,
139  read ? _redis_io_service_readable : NULL,
140  write ? _redis_io_service_writable : NULL,
142  conn) < 0) {
143  PERROR("redis handle %p - Registration failed for %s%serror events on FD %i",
144  h, read ? "read+" : "", write ? "write+" : "", c->fd);
145  return;
146  }
147 
148  h->read_set = read;
149  h->write_set = write;
150 }
151 
152 /** Register FD for reads
153  *
154  */
155 static void _redis_io_add_read(void *uctx)
156 {
157  fr_connection_t *conn = talloc_get_type_abort(uctx, fr_connection_t);
158  fr_redis_handle_t *h = conn->h;
159 
160  _redis_io_common(conn, h, true, h->write_set);
161 }
162 
163 /** De-register FD for reads
164  *
165  */
166 static void _redis_io_del_read(void *uctx)
167 {
168  fr_connection_t *conn = talloc_get_type_abort(uctx, fr_connection_t);
169  fr_redis_handle_t *h = conn->h;
170 
171  _redis_io_common(conn, h, false, h->write_set);
172 }
173 
174 /** Register FD for writes
175  *
176  */
177 static void _redis_io_add_write(void *uctx)
178 {
179  fr_connection_t *conn = talloc_get_type_abort(uctx, fr_connection_t);
180  fr_redis_handle_t *h = conn->h;
181 
182  _redis_io_common(conn, h, h->read_set, true);
183 }
184 
185 /** De-register FD for writes
186  *
187  */
188 static void _redis_io_del_write(void *uctx)
189 {
190  fr_connection_t *conn = talloc_get_type_abort(uctx, fr_connection_t);
191  fr_redis_handle_t *h = conn->h;
192 
193  _redis_io_common(conn, h, h->read_set, false);
194 }
195 
196 #ifdef HAVE_REDIS_TIMEOUT
197 /** Connection timer expired
198  *
199  */
200 static void _redis_io_service_timer_expired(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
201 {
203  fr_redis_handle_t *h = conn->h;
204 
205  DEBUG4("redis handle %p - Timeout", h);
206 
207  redisAsyncHandleTimeout(h->ac);
208 }
209 
210 /** Modify the connection I/O timer
211  *
212  */
213 static void _redis_io_timer_modify(void *uctx, struct timeval tv)
214 {
215  fr_connection_t *conn = talloc_get_type_abort(uctx, fr_connection_t);
216  fr_redis_handle_t *h = conn->h;
218 
220 
221  DEBUG4("redis handle %p - Timeout in %pV seconds", h, fr_box_time_delta(timeout));
222 
223  if (fr_event_timer_in(h, conn->el, &h->timer,
224  timeout, _redis_io_service_timer_expired, conn) < 0) {
225  PERROR("redis timeout %p - Failed adding timeout", h);
226  }
227 }
228 #endif
229 
230 /** Handle freeing the redisAsyncContext
231  *
232  * delRead and delWrite don't seem to be called when the redisAsyncContext is freed
233  *
234  * As the IO events must be removed from the event loop *before* the FD is closed
235  * and as the IO events will only be automatically de-registered when when the
236  * fr_redis_handle_t is freed.
237  *
238  * Unfortunately the destructor for the fr_redis_handle_t will be run before
239  * the IO events are de-registered, which'll free the redisAsycCtx, which'll close
240  * the FD.
241  *
242  * This means there'd be a brief period of time between the FD is closed, and
243  * it being removed from the event loop.
244  *
245  * We use the cleanup callback (which is called before the FD is closed) to remove
246  * the events now, and ensure there's no chance of issues.
247  */
248 static void _redis_io_free(void *uctx)
249 {
250  fr_connection_t *conn = talloc_get_type_abort(uctx, fr_connection_t);
251  fr_redis_handle_t *h = conn->h;
252 
253  DEBUG4("redis handle %p - Freed", h);
254 
255  _redis_io_common(conn, h, false, false);
256 }
257 
258 /** Configures async I/O callbacks for an existing redisAsyncContext
259  *
260  */
261 static int fr_redis_io_setup(redisAsyncContext *ac, fr_connection_t const *conn)
262 {
263  if (ac->ev.data != NULL) return REDIS_ERR;
264 
265  ac->ev.addRead = _redis_io_add_read;
266  ac->ev.delRead = _redis_io_del_read;
267  ac->ev.addWrite = _redis_io_add_write;
268  ac->ev.delWrite = _redis_io_del_write;
269 #ifdef HAVE_REDIS_TIMEOUT
270  ac->ev.scheduleTimer = _redis_io_timer_modify;
271 #endif
272  ac->ev.cleanup = _redis_io_free;
273  memcpy(&ac->ev.data, &conn, sizeof(ac->ev.data));
274 
275  return REDIS_OK;
276 }
277 
278 /** Free the redis async context when the handle is freed
279  *
280  */
282 {
283  /*
284  * Don't fire the reconnect callback if we're
285  * freeing the handle.
286  */
287  h->ignore_disconnect_cb = true;
288  if (h->ac) redisAsyncFree(h->ac);
289 
290  return 0;
291 }
292 
293 /** Callback for the initialise state
294  *
295  * Should attempt to open a non-blocking connection and return it in h_out.
296  *
297  * @param[out] h_out Where to write the new handle
298  * @param[in] conn This connection. Opaque, should only be used for
299  * signalling the connection state machine.
300  * @param[in] uctx User context.
301  * @return
302  * - #FR_CONNECTION_STATE_CONNECTING if a file descriptor was successfully created.
303  * - #FR_CONNECTION_STATE_FAILED if we could not open a valid handle.
304  */
305 static fr_connection_state_t _redis_io_connection_init(void **h_out, fr_connection_t *conn, void *uctx)
306 {
307  fr_redis_io_conf_t *conf = uctx;
308  char const *host = conf->hostname;
309  uint16_t port = conf->port;
311  int ret;
312 
313  /*
314  * Allocate a structure to wrap the
315  * redis async context.
316  */
317  MEM(h = talloc_zero(conn, fr_redis_handle_t));
318  talloc_set_destructor(h, _redis_handle_free);
319 
320  h->ac = redisAsyncConnect(host, port);
321  if (!h->ac) {
322  ERROR("Failed allocating handle for %s:%u", host, port);
324  }
325 
326  if (h->ac->err) {
327  ERROR("Failed allocating handle for %s:%u: %s", host, port, h->ac->errstr);
328  error:
329  redisAsyncFree(h->ac);
331  }
332 
333  /*
334  * Store the connection in private data,
335  * so we can use it for signalling.
336  *
337  * Comments in the redis src indicate
338  * it doesn't mess with this.
339  */
340  memcpy(&h->ac->data, &conn, sizeof(h->ac->data));
341 
342  /*
343  * Handle has to be associated with the
344  * conn in case I/O handlers want to get
345  * at it.
346  */
347  *h_out = h;
348 
349  /*
350  * Install the I/O service functions
351  *
352  * Event library must be set first
353  * before calling redisAsyncSetConnectCallback
354  * as just setting the callback triggers
355  * I/O events to be registered.
356  */
357  fr_redis_io_setup(h->ac, conn);
358 
359  /*
360  * Setup callbacks so we're notified
361  * when the connection state changes.
362  *
363  * We then signal the connection state
364  * machine, to let it handle
365  * reconnecting.
366  */
367  ret = redisAsyncSetConnectCallback(h->ac, _redis_connected);
368  if (ret != REDIS_OK) {
369  ERROR("Failed setting connected callback: Error %i", ret);
370  goto error;
371  }
372  if (redisAsyncSetDisconnectCallback(h->ac, _redis_disconnected) != REDIS_OK) {
373  ERROR("Failed setting disconnected callback: Error %i", ret);
374  goto error;
375  }
376 
378 
380 }
381 
382 /** Gracefully signal that the connection should shutdown
383  *
384  */
386 {
387  fr_redis_handle_t *our_h = talloc_get_type_abort(h, fr_redis_handle_t);
388 
389  redisAsyncDisconnect(our_h->ac); /* Should not free the handle */
390 
392 }
393 
394 /** Notification that the connection has errored and must be closed
395  *
396  * This should be used to close the file descriptor. It is assumed
397  * that the file descriptor is invalid after this callback has been executed.
398  *
399  * If this callback does not close the file descriptor, the server will leak
400  * file descriptors.
401  *
402  * @param[in] el to remove event handlers from.
403  * @param[in] h to close.
404  * @param[in] uctx User context.
405  */
406 static void _redis_io_connection_close(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
407 {
408  fr_redis_handle_t *our_h = talloc_get_type_abort(h, fr_redis_handle_t);
409 
410  /*
411  * The destructor will free the redisAsyncContext
412  * which'll close the connection, after removing
413  * it from the event loop.
414  */
415  talloc_free(our_h);
416 }
417 
418 /** Allocate an async redis I/O connection
419  *
420  */
422  fr_connection_conf_t const *conn_conf, fr_redis_io_conf_t const *io_conf,
423  char const *log_prefix)
424 {
425  fr_connection_t *conn;
426  /*
427  * We don't specify an open callback
428  * as hiredis handles switching over
429  * all the I/O handlers internally
430  * within hireds, and calls us when
431  * the connection is open.
432  */
433  conn = fr_connection_alloc(ctx, el,
438  },
439  conn_conf,
440  log_prefix,
441  io_conf);
442  if (!conn) return NULL;
443 
444  return conn;
445 }
446 
447 /** Return the redisAsyncContext associated with the connection
448  *
449  * This is needed to issue commands to the redis server.
450  *
451  * @param[in] conn To retrieve async ctx from.
452  * @return The async ctx.
453  */
455 {
456  fr_redis_handle_t *h = conn->h;
457  return h->ac;
458 }
#define UNUSED
Definition: build.h:313
fr_connection_state_t
Definition: connection.h:45
@ FR_CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
Definition: connection.h:50
@ FR_CONNECTION_STATE_FAILED
Connection has failed.
Definition: connection.h:54
@ FR_CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
Definition: connection.h:53
@ FR_CONNECTION_FAILED
Connection is being reconnected because it failed.
Definition: connection.h:84
Holds a complete set of functions for a connection.
Definition: connection.h:186
#define ERROR(fmt,...)
Definition: dhcpclient.c:41
static fr_time_delta_t timeout
Definition: dhcpclient.c:54
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition: dlist.h:275
#define fr_event_fd_insert(...)
Definition: event.h:232
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition: event.h:62
#define fr_event_timer_in(...)
Definition: event.h:255
static void _redis_io_service_errored(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, int fd_errno, void *uctx)
Redis FD errored - Automatically removes registered events.
Definition: io.c:98
static void _redis_io_del_write(void *uctx)
De-register FD for writes.
Definition: io.c:188
static void _redis_io_del_read(void *uctx)
De-register FD for reads.
Definition: io.c:166
static int _redis_handle_free(fr_redis_handle_t *h)
Free the redis async context when the handle is freed.
Definition: io.c:281
static fr_connection_state_t _redis_io_connection_init(void **h_out, fr_connection_t *conn, void *uctx)
Callback for the initialise state.
Definition: io.c:305
static void _redis_io_add_write(void *uctx)
Register FD for writes.
Definition: io.c:177
static void _redis_io_service_writable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Redis FD became writable.
Definition: io.c:85
static void _redis_io_free(void *uctx)
Handle freeing the redisAsyncContext.
Definition: io.c:248
static int fr_redis_io_setup(redisAsyncContext *ac, fr_connection_t const *conn)
Configures async I/O callbacks for an existing redisAsyncContext.
Definition: io.c:261
redisAsyncContext * fr_redis_connection_get_async_ctx(fr_connection_t *conn)
Return the redisAsyncContext associated with the connection.
Definition: io.c:454
static void _redis_io_common(fr_connection_t *conn, fr_redis_handle_t *h, bool read, bool write)
Deal with the method hiredis uses to register/unregister interest in a file descriptor.
Definition: io.c:115
static void _redis_io_service_readable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Redis FD became readable.
Definition: io.c:72
static fr_connection_state_t _redis_io_connection_shutdown(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
Gracefully signal that the connection should shutdown.
Definition: io.c:385
static void _redis_connected(redisAsyncContext const *ac, UNUSED int status)
Called by hiredis to indicate the connection is live.
Definition: io.c:60
static void _redis_io_add_read(void *uctx)
Register FD for reads.
Definition: io.c:155
static void _redis_disconnected(redisAsyncContext const *ac, UNUSED int status)
Called by hiredis to indicate the connection is dead.
Definition: io.c:36
fr_connection_t * fr_redis_connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, fr_connection_conf_t const *conn_conf, fr_redis_io_conf_t const *io_conf, char const *log_prefix)
Allocate an async redis I/O connection.
Definition: io.c:421
static void _redis_io_connection_close(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
Notification that the connection has errored and must be closed.
Definition: io.c:406
bool read_set
We're listening for reads.
Definition: io.h:67
redisAsyncContext * ac
Async handle for hiredis.
Definition: io.h:74
fr_event_timer_t const * timer
Connection timer.
Definition: io.h:71
fr_dlist_head_t ignore
Contains SQNs for responses that should be ignored.
Definition: io.h:76
bool ignore_disconnect_cb
Ensure that redisAsyncFree doesn't cause a callback loop.
Definition: io.h:69
bool write_set
We're listening for writes.
Definition: io.h:68
Store I/O state.
Definition: io.h:66
#define PERROR(_fmt,...)
Definition: log.h:228
#define DEBUG4(_fmt,...)
Definition: log.h:267
talloc_free(reap)
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition: event.c:1253
Stores all information relating to an event list.
Definition: event.c:411
unsigned short uint16_t
Definition: merged_model.c:31
static rs_t * conf
Definition: radsniff.c:53
void fr_connection_signal_connected(fr_connection_t *conn)
Asynchronously signal that the connection is open.
Definition: connection.c:1136
fr_connection_t * fr_connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, fr_connection_funcs_t const *funcs, fr_connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
Definition: connection.c:1507
void fr_connection_signal_reconnect(fr_connection_t *conn, fr_connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
Definition: connection.c:1166
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition: syserror.c:243
#define talloc_get_type_abort_const
Definition: talloc.h:270
static fr_time_delta_t fr_time_delta_from_timeval(struct timeval const *tv)
Definition: time.h:595
A time delta, a difference in time measured in nanoseconds.
Definition: time.h:80
"server local" time.
Definition: time.h:69
static fr_event_list_t * el
#define fr_box_time_delta(_val)
Definition: value.h:336