The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
io.c
Go to the documentation of this file.
1/*
2 * This program is is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 476aefdc5a50be87eb41c4af3de90d63bcfa9a3b $
19 * @file lib/redis/io.c
20 * @brief Common functions for interacting with Redis via hiredis
21 *
22 * @copyright 2019 The FreeRADIUS server project
23 * @copyright 2019 Network RADIUS SAS (legal@networkradius.com)
24 *
25 * @author Arran Cudbard-Bell (a.cudbardb@freeradius.org)
26 */
27
28#include <freeradius-devel/redis/io.h>
29#include <freeradius-devel/util/debug.h>
30
31#include <hiredis/async.h>
32
33/** Called by hiredis to indicate the connection is dead
34 *
35 */
36static void _redis_disconnected(redisAsyncContext const *ac, UNUSED int status)
37{
38 connection_t *conn = talloc_get_type_abort(ac->data, connection_t);
39 fr_redis_handle_t *h = conn->h;
40
41 /*
42 * redisAsyncFree was called with a live
43 * connection, but inside the talloc
44 * destructor of the fr_redis_handle_t.
45 *
46 * Don't signal the connection state
47 * machine that it needs reconnecting,
48 * the connection is being destroyed.
49 */
50 if (h->ignore_disconnect_cb) return;
51
52 DEBUG4("Signalled by hiredis, connection disconnected");
53
55}
56
57/** Called by hiredis to indicate the connection is live
58 *
59 */
60static void _redis_connected(redisAsyncContext const *ac, UNUSED int status)
61{
62 connection_t *conn = talloc_get_type_abort(ac->data, connection_t);
63
64 DEBUG4("Signalled by hiredis, connection is open");
65
67}
68
69/** Redis FD became readable
70 *
71 */
72static void _redis_io_service_readable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
73{
75 fr_redis_handle_t *h = conn->h;
76
77 DEBUG4("redis handle %p - FD %i now readable", h, fd);
78
79 redisAsyncHandleRead(h->ac);
80}
81
82/** Redis FD became writable
83 *
84 */
85static void _redis_io_service_writable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
86{
88 fr_redis_handle_t *h = conn->h;
89
90 DEBUG4("redis handle %p - FD %i now writable", h, fd);
91
92 redisAsyncHandleWrite(h->ac);
93}
94
95/** Redis FD errored - Automatically removes registered events
96 *
97 */
99 int fd_errno, void *uctx)
100{
101 connection_t *conn = talloc_get_type_abort(uctx, connection_t);
102 fr_redis_handle_t *h = conn->h;
103
104 DEBUG4("redis handle %p - FD %i errored: %s", h, fd, fr_syserror(fd_errno));
105
106 /*
107 * Connection state machine will handle reconnecting
108 */
110}
111
112/** Deal with the method hiredis uses to register/unregister interest in a file descriptor
113 *
114 */
115static void _redis_io_common(connection_t *conn, fr_redis_handle_t *h, bool read, bool write)
116{
117 redisContext *c = &(h->ac->c);
118 fr_event_list_t *el = conn->el;
119
120 /*
121 * hiredis doesn't even attempt to dedup registration
122 * requests *sigh*...
123 */
124 if ((h->read_set == read) && (h->write_set == write)) return;
125
126 if (!read && !write) {
127 DEBUG4("redis handle %p - De-registering FD %i", h, c->fd);
128
129 if (fr_event_fd_delete(el, c->fd, FR_EVENT_FILTER_IO) < 0) {
130 PERROR("redis handle %p - De-registration failed for FD %i", h, c->fd);
131 }
132 return;
133 }
134
135 DEBUG4("redis handle %p - Registered for %s%serror events on FD %i",
136 h, read ? "read+" : "", write ? "write+" : "", c->fd);
137
138 if (fr_event_fd_insert(h, NULL, el, c->fd,
139 read ? _redis_io_service_readable : NULL,
140 write ? _redis_io_service_writable : NULL,
142 conn) < 0) {
143 PERROR("redis handle %p - Registration failed for %s%serror events on FD %i",
144 h, read ? "read+" : "", write ? "write+" : "", c->fd);
145 return;
146 }
147
148 h->read_set = read;
149 h->write_set = write;
150}
151
152/** Register FD for reads
153 *
154 */
155static void _redis_io_add_read(void *uctx)
156{
157 connection_t *conn = talloc_get_type_abort(uctx, connection_t);
158 fr_redis_handle_t *h = conn->h;
159
160 _redis_io_common(conn, h, true, h->write_set);
161}
162
163/** De-register FD for reads
164 *
165 */
166static void _redis_io_del_read(void *uctx)
167{
168 connection_t *conn = talloc_get_type_abort(uctx, connection_t);
169 fr_redis_handle_t *h = conn->h;
170
171 _redis_io_common(conn, h, false, h->write_set);
172}
173
174/** Register FD for writes
175 *
176 */
177static void _redis_io_add_write(void *uctx)
178{
179 connection_t *conn = talloc_get_type_abort(uctx, connection_t);
180 fr_redis_handle_t *h = conn->h;
181
182 _redis_io_common(conn, h, h->read_set, true);
183}
184
185/** De-register FD for writes
186 *
187 */
188static void _redis_io_del_write(void *uctx)
189{
190 connection_t *conn = talloc_get_type_abort(uctx, connection_t);
191 fr_redis_handle_t *h = conn->h;
192
193 _redis_io_common(conn, h, h->read_set, false);
194}
195
196#ifdef HAVE_REDIS_TIMEOUT
197/** Connection timer expired
198 *
199 */
200static void _redis_io_service_timer_expired(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
201{
203 fr_redis_handle_t *h = conn->h;
204
205 DEBUG4("redis handle %p - Timeout", h);
206
207 redisAsyncHandleTimeout(h->ac);
208}
209
210/** Modify the connection I/O timer
211 *
212 */
213static void _redis_io_timer_modify(void *uctx, struct timeval tv)
214{
215 connection_t *conn = talloc_get_type_abort(uctx, connection_t);
216 fr_redis_handle_t *h = conn->h;
217 fr_time_delta_t timeout;
218
219 timeout = fr_time_delta_from_timeval(&tv);
220
221 DEBUG4("redis handle %p - Timeout in %pV seconds", h, fr_box_time_delta(timeout));
222
223 if (fr_event_timer_in(h, conn->el, &h->timer,
224 timeout, _redis_io_service_timer_expired, conn) < 0) {
225 PERROR("redis timeout %p - Failed adding timeout", h);
226 }
227}
228#endif
229
230/** Handle freeing the redisAsyncContext
231 *
232 * delRead and delWrite don't seem to be called when the redisAsyncContext is freed
233 *
234 * As the IO events must be removed from the event loop *before* the FD is closed
235 * and as the IO events will only be automatically de-registered when when the
236 * fr_redis_handle_t is freed.
237 *
238 * Unfortunately the destructor for the fr_redis_handle_t will be run before
239 * the IO events are de-registered, which'll free the redisAsycCtx, which'll close
240 * the FD.
241 *
242 * This means there'd be a brief period of time between the FD is closed, and
243 * it being removed from the event loop.
244 *
245 * We use the cleanup callback (which is called before the FD is closed) to remove
246 * the events now, and ensure there's no chance of issues.
247 */
248static void _redis_io_free(void *uctx)
249{
250 connection_t *conn = talloc_get_type_abort(uctx, connection_t);
251 fr_redis_handle_t *h = conn->h;
252
253 DEBUG4("redis handle %p - Freed", h);
254
255 _redis_io_common(conn, h, false, false);
256}
257
258/** Configures async I/O callbacks for an existing redisAsyncContext
259 *
260 */
261static int fr_redis_io_setup(redisAsyncContext *ac, connection_t const *conn)
262{
263 if (ac->ev.data != NULL) return REDIS_ERR;
264
265 ac->ev.addRead = _redis_io_add_read;
266 ac->ev.delRead = _redis_io_del_read;
267 ac->ev.addWrite = _redis_io_add_write;
268 ac->ev.delWrite = _redis_io_del_write;
269#ifdef HAVE_REDIS_TIMEOUT
270 ac->ev.scheduleTimer = _redis_io_timer_modify;
271#endif
272 ac->ev.cleanup = _redis_io_free;
273 memcpy(&ac->ev.data, &conn, sizeof(ac->ev.data));
274
275 return REDIS_OK;
276}
277
278/** Free the redis async context when the handle is freed
279 *
280 */
282{
283 /*
284 * Don't fire the reconnect callback if we're
285 * freeing the handle.
286 */
287 h->ignore_disconnect_cb = true;
288 if (h->ac) redisAsyncFree(h->ac);
289
290 return 0;
291}
292
293/** Callback for the initialise state
294 *
295 * Should attempt to open a non-blocking connection and return it in h_out.
296 *
297 * @param[out] h_out Where to write the new handle
298 * @param[in] conn This connection. Opaque, should only be used for
299 * signalling the connection state machine.
300 * @param[in] uctx User context.
301 * @return
302 * - #CONNECTION_STATE_CONNECTING if a file descriptor was successfully created.
303 * - #CONNECTION_STATE_FAILED if we could not open a valid handle.
304 */
305CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function*/
306static connection_state_t _redis_io_connection_init(void **h_out, connection_t *conn, void *uctx)
307{
308 fr_redis_io_conf_t *conf = uctx;
309 char const *host = conf->hostname;
310 uint16_t port = conf->port;
312 int ret;
313
314 /*
315 * Allocate a structure to wrap the
316 * redis async context.
317 */
318 MEM(h = talloc_zero(conn, fr_redis_handle_t));
319 talloc_set_destructor(h, _redis_handle_free);
320
321 h->ac = redisAsyncConnect(host, port);
322 if (!h->ac) {
323 ERROR("Failed allocating handle for %s:%u", host, port);
325 }
326
327 if (h->ac->err) {
328 ERROR("Failed allocating handle for %s:%u: %s", host, port, h->ac->errstr);
329 error:
330 redisAsyncFree(h->ac);
332 }
333
334 /*
335 * Store the connection in private data,
336 * so we can use it for signalling.
337 *
338 * Comments in the redis src indicate
339 * it doesn't mess with this.
340 */
341 memcpy(&h->ac->data, &conn, sizeof(h->ac->data));
342
343 /*
344 * Handle has to be associated with the
345 * conn in case I/O handlers want to get
346 * at it.
347 */
348 *h_out = h;
349
350 /*
351 * Install the I/O service functions
352 *
353 * Event library must be set first
354 * before calling redisAsyncSetConnectCallback
355 * as just setting the callback triggers
356 * I/O events to be registered.
357 */
358 fr_redis_io_setup(h->ac, conn);
359
360 /*
361 * Setup callbacks so we're notified
362 * when the connection state changes.
363 *
364 * We then signal the connection state
365 * machine, to let it handle
366 * reconnecting.
367 */
368 ret = redisAsyncSetConnectCallback(h->ac, _redis_connected);
369 if (ret != REDIS_OK) {
370 ERROR("Failed setting connected callback: Error %i", ret);
371 goto error;
372 }
373 if (redisAsyncSetDisconnectCallback(h->ac, _redis_disconnected) != REDIS_OK) {
374 ERROR("Failed setting disconnected callback: Error %i", ret);
375 goto error;
376 }
377
379
381}
382
383/** Gracefully signal that the connection should shutdown
384 *
385 */
387{
388 fr_redis_handle_t *our_h = talloc_get_type_abort(h, fr_redis_handle_t);
389
390 redisAsyncDisconnect(our_h->ac); /* Should not free the handle */
391
393}
394
395/** Notification that the connection has errored and must be closed
396 *
397 * This should be used to close the file descriptor. It is assumed
398 * that the file descriptor is invalid after this callback has been executed.
399 *
400 * If this callback does not close the file descriptor, the server will leak
401 * file descriptors.
402 *
403 * @param[in] el to remove event handlers from.
404 * @param[in] h to close.
405 * @param[in] uctx User context.
406 */
408{
409 fr_redis_handle_t *our_h = talloc_get_type_abort(h, fr_redis_handle_t);
410
411 /*
412 * The destructor will free the redisAsyncContext
413 * which'll close the connection, after removing
414 * it from the event loop.
415 */
416 talloc_free(our_h);
417}
418
419/** Allocate an async redis I/O connection
420 *
421 */
423 connection_conf_t const *conn_conf, fr_redis_io_conf_t const *io_conf,
424 char const *log_prefix)
425{
426 connection_t *conn;
427 /*
428 * We don't specify an open callback
429 * as hiredis handles switching over
430 * all the I/O handlers internally
431 * within hireds, and calls us when
432 * the connection is open.
433 */
434 conn = connection_alloc(ctx, el,
439 },
440 conn_conf,
441 log_prefix,
442 io_conf);
443 if (!conn) return NULL;
444
445 return conn;
446}
447
448/** Return the redisAsyncContext associated with the connection
449 *
450 * This is needed to issue commands to the redis server.
451 *
452 * @param[in] conn To retrieve async ctx from.
453 * @return The async ctx.
454 */
456{
457 fr_redis_handle_t *h = conn->h;
458 return h->ac;
459}
#define CC_NO_UBSAN(_sanitize)
Definition build.h:426
#define UNUSED
Definition build.h:315
connection_state_t
Definition connection.h:45
@ CONNECTION_STATE_FAILED
Connection has failed.
Definition connection.h:54
@ CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
Definition connection.h:50
@ CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
Definition connection.h:53
@ CONNECTION_FAILED
Connection is being reconnected because it failed.
Definition connection.h:84
Holds a complete set of functions for a connection.
Definition connection.h:186
#define MEM(x)
Definition debug.h:36
#define ERROR(fmt,...)
Definition dhcpclient.c:41
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:275
#define fr_event_fd_insert(...)
Definition event.h:232
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition event.h:62
#define fr_event_timer_in(...)
Definition event.h:255
static void _redis_io_service_errored(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, int fd_errno, void *uctx)
Redis FD errored - Automatically removes registered events.
Definition io.c:98
static void _redis_io_del_write(void *uctx)
De-register FD for writes.
Definition io.c:188
redisAsyncContext * fr_redis_connection_get_async_ctx(connection_t *conn)
Return the redisAsyncContext associated with the connection.
Definition io.c:455
static void _redis_io_del_read(void *uctx)
De-register FD for reads.
Definition io.c:166
static int _redis_handle_free(fr_redis_handle_t *h)
Free the redis async context when the handle is freed.
Definition io.c:281
static void _redis_io_add_write(void *uctx)
Register FD for writes.
Definition io.c:177
static connection_state_t _redis_io_connection_shutdown(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
Gracefully signal that the connection should shutdown.
Definition io.c:386
static void _redis_io_service_writable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Redis FD became writable.
Definition io.c:85
static connection_state_t _redis_io_connection_init(void **h_out, connection_t *conn, void *uctx)
Callback for the initialise state.
Definition io.c:306
static void _redis_io_free(void *uctx)
Handle freeing the redisAsyncContext.
Definition io.c:248
static void _redis_io_service_readable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Redis FD became readable.
Definition io.c:72
static void _redis_connected(redisAsyncContext const *ac, UNUSED int status)
Called by hiredis to indicate the connection is live.
Definition io.c:60
connection_t * fr_redis_connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_conf_t const *conn_conf, fr_redis_io_conf_t const *io_conf, char const *log_prefix)
Allocate an async redis I/O connection.
Definition io.c:422
static void _redis_io_add_read(void *uctx)
Register FD for reads.
Definition io.c:155
static int fr_redis_io_setup(redisAsyncContext *ac, connection_t const *conn)
Configures async I/O callbacks for an existing redisAsyncContext.
Definition io.c:261
static void _redis_disconnected(redisAsyncContext const *ac, UNUSED int status)
Called by hiredis to indicate the connection is dead.
Definition io.c:36
static void _redis_io_common(connection_t *conn, fr_redis_handle_t *h, bool read, bool write)
Deal with the method hiredis uses to register/unregister interest in a file descriptor.
Definition io.c:115
static void _redis_io_connection_close(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
Notification that the connection has errored and must be closed.
Definition io.c:407
bool read_set
We're listening for reads.
Definition io.h:67
redisAsyncContext * ac
Async handle for hiredis.
Definition io.h:74
fr_event_timer_t const * timer
Connection timer.
Definition io.h:71
fr_dlist_head_t ignore
Contains SQNs for responses that should be ignored.
Definition io.h:76
bool ignore_disconnect_cb
Ensure that redisAsyncFree doesn't cause a callback loop.
Definition io.h:69
bool write_set
We're listening for writes.
Definition io.h:68
Store I/O state.
Definition io.h:66
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG4(_fmt,...)
Definition log.h:267
talloc_free(reap)
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition event.c:1260
Stores all information relating to an event list.
Definition event.c:411
unsigned short uint16_t
static rs_t * conf
Definition radsniff.c:53
void connection_signal_reconnect(connection_t *conn, connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
connection_t * connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_funcs_t const *funcs, connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
void connection_signal_connected(connection_t *conn)
Asynchronously signal that the connection is open.
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
#define talloc_get_type_abort_const
Definition talloc.h:282
static fr_time_delta_t fr_time_delta_from_timeval(struct timeval const *tv)
Definition time.h:597
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
static fr_event_list_t * el
#define fr_box_time_delta(_val)
Definition value.h:343