The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
io.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or (at
5 * your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 49f07b0c05c976ccc22ce1a5bbb740ae2a22dddd $
19 * @file lib/redis/io.c
20 * @brief Common functions for interacting with Redis via hiredis
21 *
22 * @copyright 2019 The FreeRADIUS server project
23 * @copyright 2019 Network RADIUS SAS (legal@networkradius.com)
24 *
25 * @author Arran Cudbard-Bell (a.cudbardb@freeradius.org)
26 */
27
28#include <freeradius-devel/redis/io.h>
29#include <freeradius-devel/util/debug.h>
30
31
32/** Called by hiredis to indicate the connection is dead
33 *
34 */
35static void _redis_disconnected(redisAsyncContext const *ac, UNUSED int status)
36{
37 connection_t *conn = talloc_get_type_abort(ac->data, connection_t);
38 fr_redis_handle_t *h = conn->h;
39
40 /*
41 * redisAsyncFree was called with a live
42 * connection, but inside the talloc
43 * destructor of the fr_redis_handle_t.
44 *
45 * Don't signal the connection state
46 * machine that it needs reconnecting,
47 * the connection is being destroyed.
48 */
49 if (h->ignore_disconnect_cb) return;
50
51 DEBUG4("Signalled by hiredis, connection disconnected");
52
54}
55
56/** Called by hiredis to indicate the connection is live
57 *
58 */
59static void _redis_connected(redisAsyncContext const *ac, UNUSED int status)
60{
61 connection_t *conn = talloc_get_type_abort(ac->data, connection_t);
62
63 DEBUG4("Signalled by hiredis, connection is open");
64
66}
67
68/** Redis FD became readable
69 *
70 */
71static void _redis_io_service_readable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
72{
74 fr_redis_handle_t *h = conn->h;
75
76 DEBUG4("redis handle %p - FD %i now readable", h, fd);
77
78 redisAsyncHandleRead(h->ac);
79}
80
81/** Redis FD became writable
82 *
83 */
84static void _redis_io_service_writable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
85{
87 fr_redis_handle_t *h = conn->h;
88
89 DEBUG4("redis handle %p - FD %i now writable", h, fd);
90
91 redisAsyncHandleWrite(h->ac);
92}
93
94/** Redis FD errored - Automatically removes registered events
95 *
96 */
98 int fd_errno, void *uctx)
99{
100 connection_t *conn = talloc_get_type_abort(uctx, connection_t);
101 fr_redis_handle_t *h = conn->h;
102
103 DEBUG4("redis handle %p - FD %i errored: %s", h, fd, fr_syserror(fd_errno));
104
105 /*
106 * Connection state machine will handle reconnecting
107 */
109}
110
111/** Deal with the method hiredis uses to register/unregister interest in a file descriptor
112 *
113 */
114static void _redis_io_common(connection_t *conn, fr_redis_handle_t *h, bool read, bool write)
115{
116 redisContext *c = &(h->ac->c);
117 fr_event_list_t *el = conn->el;
118
119 /*
120 * hiredis doesn't even attempt to dedup registration
121 * requests *sigh*...
122 */
123 if ((h->read_set == read) && (h->write_set == write)) return;
124
125 if (!read && !write) {
126 DEBUG4("redis handle %p - De-registering FD %i", h, c->fd);
127
128 if (fr_event_fd_delete(el, c->fd, FR_EVENT_FILTER_IO) < 0) {
129 PERROR("redis handle %p - De-registration failed for FD %i", h, c->fd);
130 }
131 return;
132 }
133
134 DEBUG4("redis handle %p - Registered for %s%serror events on FD %i",
135 h, read ? "read+" : "", write ? "write+" : "", c->fd);
136
137 if (fr_event_fd_insert(h, NULL, el, c->fd,
138 read ? _redis_io_service_readable : NULL,
139 write ? _redis_io_service_writable : NULL,
141 conn) < 0) {
142 PERROR("redis handle %p - Registration failed for %s%serror events on FD %i",
143 h, read ? "read+" : "", write ? "write+" : "", c->fd);
144 return;
145 }
146
147 h->read_set = read;
148 h->write_set = write;
149}
150
151/** Register FD for reads
152 *
153 */
154static void _redis_io_add_read(void *uctx)
155{
156 connection_t *conn = talloc_get_type_abort(uctx, connection_t);
157 fr_redis_handle_t *h = conn->h;
158
159 _redis_io_common(conn, h, true, h->write_set);
160}
161
162/** De-register FD for reads
163 *
164 */
165static void _redis_io_del_read(void *uctx)
166{
167 connection_t *conn = talloc_get_type_abort(uctx, connection_t);
168 fr_redis_handle_t *h = conn->h;
169
170 _redis_io_common(conn, h, false, h->write_set);
171}
172
173/** Register FD for writes
174 *
175 */
176static void _redis_io_add_write(void *uctx)
177{
178 connection_t *conn = talloc_get_type_abort(uctx, connection_t);
179 fr_redis_handle_t *h = conn->h;
180
181 _redis_io_common(conn, h, h->read_set, true);
182}
183
184/** De-register FD for writes
185 *
186 */
187static void _redis_io_del_write(void *uctx)
188{
189 connection_t *conn = talloc_get_type_abort(uctx, connection_t);
190 fr_redis_handle_t *h = conn->h;
191
192 _redis_io_common(conn, h, h->read_set, false);
193}
194
195#ifdef HAVE_REDIS_TIMEOUT
196/** Connection timer expired
197 *
198 */
199static void _redis_io_service_timer_expired(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
200{
202 fr_redis_handle_t *h = conn->h;
203
204 DEBUG4("redis handle %p - Timeout", h);
205
206 redisAsyncHandleTimeout(h->ac);
207}
208
209/** Modify the connection I/O timer
210 *
211 */
212static void _redis_io_timer_modify(void *uctx, struct timeval tv)
213{
214 connection_t *conn = talloc_get_type_abort(uctx, connection_t);
215 fr_redis_handle_t *h = conn->h;
216 fr_time_delta_t timeout;
217
218 timeout = fr_time_delta_from_timeval(&tv);
219
220 DEBUG4("redis handle %p - Timeout in %pV seconds", h, fr_box_time_delta(timeout));
221
222 if (fr_timer_in(h, conn->el, &h->timer,
223 timeout, _redis_io_service_timer_expired, conn) < 0) {
224 PERROR("redis timeout %p - Failed adding timeout", h);
225 }
226}
227#endif
228
229/** Handle freeing the redisAsyncContext
230 *
231 * delRead and delWrite don't seem to be called when the redisAsyncContext is freed
232 *
233 * As the IO events must be removed from the event loop *before* the FD is closed
234 * and as the IO events will only be automatically de-registered when when the
235 * fr_redis_handle_t is freed.
236 *
237 * Unfortunately the destructor for the fr_redis_handle_t will be run before
238 * the IO events are de-registered, which'll free the redisAsycCtx, which'll close
239 * the FD.
240 *
241 * This means there'd be a brief period of time between the FD is closed, and
242 * it being removed from the event loop.
243 *
244 * We use the cleanup callback (which is called before the FD is closed) to remove
245 * the events now, and ensure there's no chance of issues.
246 */
247static void _redis_io_free(void *uctx)
248{
249 connection_t *conn = talloc_get_type_abort(uctx, connection_t);
250 fr_redis_handle_t *h = conn->h;
251
252 DEBUG4("redis handle %p - Freed", h);
253
254 _redis_io_common(conn, h, false, false);
255}
256
257/** Configures async I/O callbacks for an existing redisAsyncContext
258 *
259 */
260static int fr_redis_io_setup(redisAsyncContext *ac, connection_t const *conn)
261{
262 if (ac->ev.data != NULL) return REDIS_ERR;
263
264 ac->ev.addRead = _redis_io_add_read;
265 ac->ev.delRead = _redis_io_del_read;
266 ac->ev.addWrite = _redis_io_add_write;
267 ac->ev.delWrite = _redis_io_del_write;
268#ifdef HAVE_REDIS_TIMEOUT
269 ac->ev.scheduleTimer = _redis_io_timer_modify;
270#endif
271 ac->ev.cleanup = _redis_io_free;
272 memcpy(&ac->ev.data, &conn, sizeof(ac->ev.data));
273
274 return REDIS_OK;
275}
276
277/** Free the redis async context when the handle is freed
278 *
279 */
281{
282 /*
283 * Don't fire the reconnect callback if we're
284 * freeing the handle.
285 */
286 h->ignore_disconnect_cb = true;
287 if (h->ac) redisAsyncFree(h->ac);
288
289 return 0;
290}
291
292/** Callback for the initialise state
293 *
294 * Should attempt to open a non-blocking connection and return it in h_out.
295 *
296 * @param[out] h_out Where to write the new handle
297 * @param[in] conn This connection. Opaque, should only be used for
298 * signalling the connection state machine.
299 * @param[in] uctx User context.
300 * @return
301 * - #CONNECTION_STATE_CONNECTING if a file descriptor was successfully created.
302 * - #CONNECTION_STATE_FAILED if we could not open a valid handle.
303 */
304CC_NO_UBSAN(function) /* UBSAN: false positive - public vs private connection_t trips --fsanitize=function*/
305static connection_state_t _redis_io_connection_init(void **h_out, connection_t *conn, void *uctx)
306{
307 fr_redis_io_conf_t *conf = uctx;
308 char const *host = conf->hostname;
309 uint16_t port = conf->port;
311 int ret;
312
313 /*
314 * Allocate a structure to wrap the
315 * redis async context.
316 */
317 MEM(h = talloc_zero(conn, fr_redis_handle_t));
318 talloc_set_destructor(h, _redis_handle_free);
319
320 h->ac = redisAsyncConnect(host, port);
321 if (!h->ac) {
322 ERROR("Failed allocating handle for %s:%u", host, port);
324 }
325
326 if (h->ac->err) {
327 ERROR("Failed allocating handle for %s:%u: %s", host, port, h->ac->errstr);
328 error:
329 redisAsyncFree(h->ac);
331 }
332
333 /*
334 * Store the connection in private data,
335 * so we can use it for signalling.
336 *
337 * Comments in the redis src indicate
338 * it doesn't mess with this.
339 */
340 memcpy(&h->ac->data, &conn, sizeof(h->ac->data));
341
342 /*
343 * Handle has to be associated with the
344 * conn in case I/O handlers want to get
345 * at it.
346 */
347 *h_out = h;
348
349 /*
350 * Install the I/O service functions
351 *
352 * Event library must be set first
353 * before calling redisAsyncSetConnectCallback
354 * as just setting the callback triggers
355 * I/O events to be registered.
356 */
357 fr_redis_io_setup(h->ac, conn);
358
359 /*
360 * Setup callbacks so we're notified
361 * when the connection state changes.
362 *
363 * We then signal the connection state
364 * machine, to let it handle
365 * reconnecting.
366 */
367 ret = redisAsyncSetConnectCallback(h->ac, _redis_connected);
368 if (ret != REDIS_OK) {
369 ERROR("Failed setting connected callback: Error %i", ret);
370 goto error;
371 }
372
373 ret = redisAsyncSetDisconnectCallback(h->ac, _redis_disconnected);
374 if (ret != REDIS_OK) {
375 ERROR("Failed setting disconnected callback: Error %i", ret);
376 goto error;
377 }
378
380
382}
383
384/** Gracefully signal that the connection should shutdown
385 *
386 */
388{
389 fr_redis_handle_t *our_h = talloc_get_type_abort(h, fr_redis_handle_t);
390
391 redisAsyncDisconnect(our_h->ac); /* Should not free the handle */
392
394}
395
396/** Notification that the connection has errored and must be closed
397 *
398 * This should be used to close the file descriptor. It is assumed
399 * that the file descriptor is invalid after this callback has been executed.
400 *
401 * If this callback does not close the file descriptor, the server will leak
402 * file descriptors.
403 *
404 * @param[in] el to remove event handlers from.
405 * @param[in] h to close.
406 * @param[in] uctx User context.
407 */
409{
410 fr_redis_handle_t *our_h = talloc_get_type_abort(h, fr_redis_handle_t);
411
412 /*
413 * The destructor will free the redisAsyncContext
414 * which'll close the connection, after removing
415 * it from the event loop.
416 */
417 talloc_free(our_h);
418}
419
420/** Allocate an async redis I/O connection
421 *
422 */
424 connection_conf_t const *conn_conf, fr_redis_io_conf_t const *io_conf,
425 char const *log_prefix)
426{
427 connection_t *conn;
428 /*
429 * We don't specify an open callback
430 * as hiredis handles switching over
431 * all the I/O handlers internally
432 * within hireds, and calls us when
433 * the connection is open.
434 */
435 conn = connection_alloc(ctx, el,
440 },
441 conn_conf,
442 log_prefix,
443 io_conf);
444 if (!conn) return NULL;
445
446 return conn;
447}
448
449/** Return the redisAsyncContext associated with the connection
450 *
451 * This is needed to issue commands to the redis server.
452 *
453 * @param[in] conn To retrieve async ctx from.
454 * @return The async ctx.
455 */
457{
458 fr_redis_handle_t *h = conn->h;
459 return h->ac;
460}
#define CC_NO_UBSAN(_sanitize)
Definition build.h:449
#define UNUSED
Definition build.h:336
connection_state_t
Definition connection.h:47
@ CONNECTION_STATE_FAILED
Connection has failed.
Definition connection.h:56
@ CONNECTION_STATE_CONNECTING
Waiting for connection to establish.
Definition connection.h:52
@ CONNECTION_STATE_SHUTDOWN
Connection is shutting down.
Definition connection.h:55
@ CONNECTION_FAILED
Connection is being reconnected because it failed.
Definition connection.h:85
Holds a complete set of functions for a connection.
Definition connection.h:195
#define MEM(x)
Definition debug.h:46
#define ERROR(fmt,...)
Definition dhcpclient.c:40
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:257
#define fr_event_fd_insert(...)
Definition event.h:247
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition event.h:83
talloc_free(hp)
static void _redis_io_service_errored(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, int fd_errno, void *uctx)
Redis FD errored - Automatically removes registered events.
Definition io.c:97
static void _redis_io_del_write(void *uctx)
De-register FD for writes.
Definition io.c:187
redisAsyncContext * fr_redis_connection_get_async_ctx(connection_t *conn)
Return the redisAsyncContext associated with the connection.
Definition io.c:456
static void _redis_io_del_read(void *uctx)
De-register FD for reads.
Definition io.c:165
static int _redis_handle_free(fr_redis_handle_t *h)
Free the redis async context when the handle is freed.
Definition io.c:280
static void _redis_io_add_write(void *uctx)
Register FD for writes.
Definition io.c:176
static connection_state_t _redis_io_connection_shutdown(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
Gracefully signal that the connection should shutdown.
Definition io.c:387
static void _redis_io_service_writable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Redis FD became writable.
Definition io.c:84
static connection_state_t _redis_io_connection_init(void **h_out, connection_t *conn, void *uctx)
Callback for the initialise state.
Definition io.c:305
static void _redis_io_free(void *uctx)
Handle freeing the redisAsyncContext.
Definition io.c:247
static void _redis_io_service_readable(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Redis FD became readable.
Definition io.c:71
static void _redis_connected(redisAsyncContext const *ac, UNUSED int status)
Called by hiredis to indicate the connection is live.
Definition io.c:59
connection_t * fr_redis_connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_conf_t const *conn_conf, fr_redis_io_conf_t const *io_conf, char const *log_prefix)
Allocate an async redis I/O connection.
Definition io.c:423
static void _redis_io_add_read(void *uctx)
Register FD for reads.
Definition io.c:154
static int fr_redis_io_setup(redisAsyncContext *ac, connection_t const *conn)
Configures async I/O callbacks for an existing redisAsyncContext.
Definition io.c:260
static void _redis_disconnected(redisAsyncContext const *ac, UNUSED int status)
Called by hiredis to indicate the connection is dead.
Definition io.c:35
static void _redis_io_common(connection_t *conn, fr_redis_handle_t *h, bool read, bool write)
Deal with the method hiredis uses to register/unregister interest in a file descriptor.
Definition io.c:114
static void _redis_io_connection_close(UNUSED fr_event_list_t *el, void *h, UNUSED void *uctx)
Notification that the connection has errored and must be closed.
Definition io.c:408
bool read_set
We're listening for reads.
Definition io.h:67
redisAsyncContext * ac
Async handle for hiredis.
Definition io.h:74
fr_dlist_head_t ignore
Contains SQNs for responses that should be ignored.
Definition io.h:76
bool ignore_disconnect_cb
Ensure that redisAsyncFree doesn't cause a callback loop.
Definition io.h:69
bool write_set
We're listening for writes.
Definition io.h:68
Store I/O state.
Definition io.h:66
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG4(_fmt,...)
Definition log.h:267
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition event.c:1203
Stores all information relating to an event list.
Definition event.c:377
unsigned short uint16_t
static rs_t * conf
Definition radsniff.c:52
void connection_signal_reconnect(connection_t *conn, connection_reason_t reason)
Asynchronously signal the connection should be reconnected.
connection_t * connection_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, connection_funcs_t const *funcs, connection_conf_t const *conf, char const *log_prefix, void const *uctx)
Allocate a new connection.
void connection_signal_connected(connection_t *conn)
Asynchronously signal that the connection is open.
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
#define talloc_get_type_abort_const
Definition talloc.h:110
static fr_time_delta_t fr_time_delta_from_timeval(struct timeval const *tv)
Definition time.h:597
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
#define fr_timer_in(...)
Definition timer.h:87
static fr_event_list_t * el
#define fr_box_time_delta(_val)
Definition value.h:366