The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
udp_queue.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/*
18 * $Id: 2c31f0cda126ffa7b0a19de62eb0b712c6f6743f $
19 *
20 * @file src/lib/server/udp_queue.c
21 * @brief Handle queues of outgoing UDP packets
22 *
23 * @author Alan DeKok (aland@freeradius.org)
24 *
25 * @copyright 2021 Network RADIUS SAS (legal@networkradius.com)
26 */
27RCSID("$Id: 2c31f0cda126ffa7b0a19de62eb0b712c6f6743f $")
28
29#include <freeradius-devel/util/debug.h>
30#include <freeradius-devel/util/syserror.h>
31
32#include <freeradius-devel/util/socket.h>
33#include <freeradius-devel/util/udp_queue.h>
34
36 fr_udp_queue_config_t const *config; //!< configuration
37 fr_dlist_head_t queue; //!< list of queued packets to write, ordered by time
38
40 int fd;
41 int port;
42 bool blocked; //!< are we blocked?
43
45};
46
47typedef struct {
48 struct sockaddr_storage sockaddr;
49 socklen_t socklen;
50
53
54 void *rctx;
55
57
58 size_t packet_len;
59 uint8_t packet[];
61
63{
65 talloc_free(entry);
66 }}
67
68 close(uq->fd);
69
70 return 0;
71}
72
74{
75 fr_udp_queue_t *uq = entry->uq;
76 void *rctx = entry->rctx;
77
78 fr_dlist_remove(&uq->queue, entry);
79
80 if (uq->resume) uq->resume(false, rctx);
81
82 return 0;
83}
84
85/** Allocate an outbound UDP queue.
86 *
87 * @param ctx where the structure will be allocated.
88 * @param config containing the IPs, ports, etc
89 * @param el the event list for adding events to see if the socket is writable
90 * @param resume the function to call after a delayed packet has been written
91 * @return
92 * - NULL on error
93 * - !NULL on success
94 */
97{
99 int fd;
100 fr_ipaddr_t ipaddr = config->ipaddr;
101 uint16_t port = config->port;
102
103 /*
104 * Open the socket.
105 */
106 fd = fr_socket_server_udp(&config->ipaddr, &port, NULL, false);
107 if (fd < 0) return NULL;
108
109 /*
110 * Set SO_REUSEPORT if we're binding to a specific port
111 * (e.g. DHCP), so that multiple threads can use the same
112 * port.
113 */
114 if (config->port != 0) {
115 int on = 1;
116
117 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)) < 0) {
118 fr_strerror_printf("SO_REUSEPORT said %s", fr_syserror(errno));
119 goto error;
120 }
121 }
122
123 /*
124 * Bind to the given interface.
125 */
126 if (config->interface &&
127 (fr_socket_bind(fd, config->interface, &ipaddr, &port) < 0)) goto error;
128
129#ifdef SO_SNDBUF
130 /*
131 * Set SO_SNDBUF size, if configured to do so.
132 */
133 if (config->send_buff_is_set) {
134 int opt;
135
136 opt = config->send_buff;
137
138 if (opt < 65536) opt = 65536;
139 if (opt > (1 << 30)) opt = 1<<30;
140
141 (void) setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(int));
142 }
143#endif
144
145#ifdef SO_RCVBUF
146 /*
147 * Set SO_RECVBUFF to 4K, so that the kernel will quickly
148 * drop incoming packets. We don't expect replies, and
149 * we never check the socket for readability, so this is
150 * fine.
151 */
152 {
153 int opt = 4096;
154
155 (void) setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &opt, sizeof(int));
156 }
157#endif
158
159 uq = talloc_zero(ctx, fr_udp_queue_t);
160 if (!uq) {
161 error:
162 close(fd);
163 return NULL;
164 }
165
166 *uq = (fr_udp_queue_t) {
167 .config = config,
168 .el = el,
169 .fd = fd,
170 .port = port,
171 .resume = resume,
172 };
173
175
176 talloc_set_destructor(uq, _udp_queue_free);
177
178 return uq;
179}
180
181/** If the socket is writable, then flush packets until either it
182 * returns EWOULDBLOCK, or there are no more packets to write.
183 *
184 */
186 UNUSED int flags, void *uctx)
187{
188 fr_udp_queue_t *uq = talloc_get_type_abort(uctx, fr_udp_queue_t);
189 fr_time_t now = fr_time();
190
192 ssize_t rcode;
193 int retries = 0;
194
195 /*
196 * If the entry is expired, tell the caller that
197 * it wasn't written to the socket.
198 */
199 if (fr_time_gteq(now, entry->expires)) {
200 void *rctx = entry->rctx;
201
202 talloc_free(entry);
203 if (uq->resume) uq->resume(false, rctx);
204 continue;
205 }
206
207 retry:
208 rcode = sendto(uq->fd, entry->packet, entry->packet_len, 0, (struct sockaddr *) &entry->sockaddr, entry->socklen);
209 if (rcode >= 0) {
210 void *rctx = entry->rctx;
211
212 talloc_free(entry);
213 if (uq->resume) uq->resume(true, rctx);
214 continue;
215 }
216
217 if (rcode < 0) {
218 if (errno == EINTR) {
219 if (retries++ < 3) goto retry;
220 return;
221 }
222
223#if EWOULDBLOCK != EAGAIN
224 if (!((errno == EWOULDBLOCK) || (errno == EAGAIN))) return;
225#else
226 if (errno != EWOULDBLOCK) return;
227#endif
228 }
229 }}
230
231 /*
232 * Nothing more to write, delete the IO handler so that we don't get extraneous signals.
233 */
234 if (fr_dlist_num_elements(&uq->queue) == 0) {
235 fr_event_fd_delete(uq->el, uq->fd, FR_EVENT_FILTER_IO);
236 uq->blocked = false;
237 }
238}
239
240/** Write packet to socket, OR enqueue it if we get EAGAIN
241 *
242 * In most cases, the packet will get written to the socket immediately.
243 *
244 * However, if the socket is blocked, then the packet is added to an
245 * outbound queue. When the socket becomes unblocked, the packets
246 * will be sent.
247 *
248 * @param ctx the talloc context for this packet to be saved in, usually request_t
249 * @param uq the local queue to write it to
250 * @param packet the packet to write
251 * @param packet_len how long the packet is
252 * @param ipaddr the IP address we're sending the packet to
253 * @param port the port we're sending the packet to
254 * @param rctx for resumption, usually request_t, or a structure which holds a request_t
255 * @return
256 * - <0 for error
257 * - 0 for "didn't write it to socket, but added it to the queue, and the caller should yield"
258 * - 1 for "wrote it to the socket, you're good to go".
259 */
260int fr_udp_queue_write(TALLOC_CTX *ctx, fr_udp_queue_t *uq, uint8_t const *packet, size_t packet_len,
261 fr_ipaddr_t const *ipaddr, int port, void *rctx)
262{
263 struct sockaddr_storage sockaddr;
264 socklen_t socklen;
266
267 fr_ipaddr_to_sockaddr(&sockaddr, &socklen, ipaddr, port);
268
269 if (!packet_len || !port) return 1;
270
271 if (!uq->blocked) {
272 int retries = 0;
273 ssize_t rcode;
274
275retry:
276 rcode = sendto(uq->fd, packet, packet_len, 0, (struct sockaddr *) &sockaddr, socklen);
277 if (rcode >= 0) return 1;
278
279 if (rcode < 0) {
280 if (errno == EINTR) {
281 if (retries++ < 3) goto retry;
282 return -1;
283 }
284
285#if EWOULDBLOCK != EAGAIN
286 if (!((errno == EWOULDBLOCK) || (errno == EAGAIN))) return -1;
287#else
288 if (errno != EWOULDBLOCK) return -1;
289#endif
290 }
291
292 /*
293 */
294 if (fr_event_fd_insert(uq, NULL, uq->el, uq->fd, NULL,
295 udp_queue_writable, NULL, uq) < 0) {
296 return -1;
297 }
298
299 uq->blocked = true;
300 }
301
302 /*
303 * Limit the number of packets in the queue.
304 */
305 if (uq->config->max_queued_packets &&
307 return -1;
308 }
309
310 entry = (fr_udp_queue_entry_t *) talloc_zero_array(ctx, uint8_t, sizeof(fr_udp_queue_entry_t) + packet_len);
311 if (!entry) return -1;
312
313 talloc_set_type(entry, fr_udp_queue_entry_t);
314 talloc_set_destructor(entry, _udp_queue_entry_free);
315
316 *entry = (fr_udp_queue_entry_t) {
317 .sockaddr = sockaddr,
318 .socklen = socklen,
319 .uq = uq,
320 .expires = fr_time_add(fr_time(), uq->config->max_queued_time),
321 .rctx = rctx,
322 .packet_len = packet_len,
323 };
324
325 memcpy(entry->packet, packet, packet_len);
326 fr_dlist_insert_tail(&uq->queue, entry);
327
328 /*
329 * Didn't do anything, say so.
330 */
331
332 return 0;
333}
#define RCSID(id)
Definition build.h:483
#define UNUSED
Definition build.h:315
static int retries
Definition dhcpclient.c:53
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:260
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition dlist.h:638
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:939
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:378
#define fr_dlist_foreach_safe(_list_head, _type, _iter)
Iterate over the contents of a list allowing for removals.
Definition dlist.h:108
Head of a doubly linked list.
Definition dlist.h:51
Entry in a doubly linked list.
Definition dlist.h:41
#define fr_event_fd_insert(...)
Definition event.h:232
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition event.h:62
int fr_ipaddr_to_sockaddr(struct sockaddr_storage *sa, socklen_t *salen, fr_ipaddr_t const *ipaddr, uint16_t port)
Convert our internal ip address representation to a sockaddr.
Definition inet.c:1392
IPv4/6 prefix.
talloc_free(reap)
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition event.c:1260
Stores all information relating to an event list.
Definition event.c:411
unsigned short uint16_t
long int ssize_t
unsigned char uint8_t
static const conf_parser_t config[]
Definition base.c:183
int fr_socket_server_udp(fr_ipaddr_t const *src_ipaddr, uint16_t *src_port, char const *port_name, bool async)
Open an IPv4/IPv6 unconnected UDP socket.
Definition socket.c:867
int fr_socket_bind(int sockfd, char const *ifname, fr_ipaddr_t *src_ipaddr, uint16_t *src_port)
Bind a UDP/TCP v4/v6 socket to a given ipaddr src port, and interface.
Definition socket.c:229
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
#define fr_time_gteq(_a, _b)
Definition time.h:238
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
"server local" time.
Definition time.h:69
static void udp_queue_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
If the socket is writable, then flush packets until either it returns EWOULDBLOCK,...
Definition udp_queue.c:185
bool blocked
are we blocked?
Definition udp_queue.c:42
int fr_udp_queue_write(TALLOC_CTX *ctx, fr_udp_queue_t *uq, uint8_t const *packet, size_t packet_len, fr_ipaddr_t const *ipaddr, int port, void *rctx)
Write packet to socket, OR enqueue it if we get EAGAIN.
Definition udp_queue.c:260
static int _udp_queue_free(fr_udp_queue_t *uq)
Definition udp_queue.c:62
struct sockaddr_storage sockaddr
Definition udp_queue.c:48
fr_dlist_head_t queue
list of queued packets to write, ordered by time
Definition udp_queue.c:37
fr_udp_queue_t * uq
Definition udp_queue.c:51
fr_event_list_t * el
Definition udp_queue.c:39
close(uq->fd)
fr_udp_queue_resume_t resume
Definition udp_queue.c:44
static int _udp_queue_entry_free(fr_udp_queue_entry_t *entry)
Definition udp_queue.c:73
fr_udp_queue_t * fr_udp_queue_alloc(TALLOC_CTX *ctx, fr_udp_queue_config_t const *config, fr_event_list_t *el, fr_udp_queue_resume_t resume)
Allocate an outbound UDP queue.
Definition udp_queue.c:95
fr_udp_queue_config_t const * config
configuration
Definition udp_queue.c:36
Definition udp_queue.c:47
uint32_t max_queued_packets
maximum queued packets
Definition udp_queue.h:44
void(* fr_udp_queue_resume_t)(bool written, void *rctx)
Definition udp_queue.h:53
struct fr_udp_queue_s fr_udp_queue_t
Definition udp_queue.h:51
fr_time_delta_t max_queued_time
maximum time a packet can be queued
Definition udp_queue.h:42
static fr_event_list_t * el
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64