The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
udp_queue.c
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or
5  * (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /*
18  * $Id: 2c31f0cda126ffa7b0a19de62eb0b712c6f6743f $
19  *
20  * @file src/lib/server/udp_queue.c
21  * @brief Handle queues of outgoing UDP packets
22  *
23  * @author Alan DeKok (aland@freeradius.org)
24  *
25  * @copyright 2021 Network RADIUS SAS (legal@networkradius.com)
26  */
27 RCSID("$Id: 2c31f0cda126ffa7b0a19de62eb0b712c6f6743f $")
28 
29 #include <freeradius-devel/util/debug.h>
30 #include <freeradius-devel/util/syserror.h>
31 
32 #include <freeradius-devel/util/socket.h>
33 #include <freeradius-devel/util/udp_queue.h>
34 
36  fr_udp_queue_config_t const *config; //!< configuration
37  fr_dlist_head_t queue; //!< list of queued packets to write, ordered by time
38 
40  int fd;
41  int port;
42  bool blocked; //!< are we blocked?
43 
45 };
46 
47 typedef struct {
48  struct sockaddr_storage sockaddr;
49  socklen_t socklen;
50 
53 
54  void *rctx;
55 
57 
58  size_t packet_len;
59  uint8_t packet[];
61 
63 {
65  talloc_free(entry);
66  }}
67 
68  close(uq->fd);
69 
70  return 0;
71 }
72 
74 {
75  fr_udp_queue_t *uq = entry->uq;
76  void *rctx = entry->rctx;
77 
78  fr_dlist_remove(&uq->queue, entry);
79 
80  if (uq->resume) uq->resume(false, rctx);
81 
82  return 0;
83 }
84 
85 /** Allocate an outbound UDP queue.
86  *
87  * @param ctx where the structure will be allocated.
88  * @param config containing the IPs, ports, etc
89  * @param el the event list for adding events to see if the socket is writable
90  * @param resume the function to call after a delayed packet has been written
91  * @return
92  * - NULL on error
93  * - !NULL on success
94  */
96  fr_udp_queue_resume_t resume)
97 {
98  fr_udp_queue_t *uq;
99  int fd;
100  fr_ipaddr_t ipaddr = config->ipaddr;
101  uint16_t port = config->port;
102 
103  /*
104  * Open the socket.
105  */
106  fd = fr_socket_server_udp(&config->ipaddr, &port, NULL, false);
107  if (fd < 0) return NULL;
108 
109  /*
110  * Set SO_REUSEPORT if we're binding to a specific port
111  * (e.g. DHCP), so that multiple threads can use the same
112  * port.
113  */
114  if (config->port != 0) {
115  int on = 1;
116 
117  if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)) < 0) {
118  fr_strerror_printf("SO_REUSEPORT said %s", fr_syserror(errno));
119  goto error;
120  }
121  }
122 
123  /*
124  * Bind to the given interface.
125  */
126  if (config->interface &&
127  (fr_socket_bind(fd, config->interface, &ipaddr, &port) < 0)) goto error;
128 
129 #ifdef SO_SNDBUF
130  /*
131  * Set SO_SNDBUF size, if configured to do so.
132  */
133  if (config->send_buff_is_set) {
134  int opt;
135 
136  opt = config->send_buff;
137 
138  if (opt < 65536) opt = 65536;
139  if (opt > (1 << 30)) opt = 1<<30;
140 
141  (void) setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(int));
142  }
143 #endif
144 
145 #ifdef SO_RCVBUF
146  /*
147  * Set SO_RECVBUFF to 4K, so that the kernel will quickly
148  * drop incoming packets. We don't expect replies, and
149  * we never check the socket for readability, so this is
150  * fine.
151  */
152  {
153  int opt = 4096;
154 
155  (void) setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &opt, sizeof(int));
156  }
157 #endif
158 
159  uq = talloc_zero(ctx, fr_udp_queue_t);
160  if (!uq) {
161  error:
162  close(fd);
163  return NULL;
164  }
165 
166  *uq = (fr_udp_queue_t) {
167  .config = config,
168  .el = el,
169  .fd = fd,
170  .port = port,
171  .resume = resume,
172  };
173 
175 
176  talloc_set_destructor(uq, _udp_queue_free);
177 
178  return uq;
179 }
180 
181 /** If the socket is writable, then flush packets until either it
182  * returns EWOULDBLOCK, or there are no more packets to write.
183  *
184  */
186  UNUSED int flags, void *uctx)
187 {
188  fr_udp_queue_t *uq = talloc_get_type_abort(uctx, fr_udp_queue_t);
189  fr_time_t now = fr_time();
190 
192  ssize_t rcode;
193  int retries = 0;
194 
195  /*
196  * If the entry is expired, tell the caller that
197  * it wasn't written to the socket.
198  */
199  if (fr_time_gteq(now, entry->expires)) {
200  void *rctx = entry->rctx;
201 
202  talloc_free(entry);
203  if (uq->resume) uq->resume(false, rctx);
204  continue;
205  }
206 
207  retry:
208  rcode = sendto(uq->fd, entry->packet, entry->packet_len, 0, (struct sockaddr *) &entry->sockaddr, entry->socklen);
209  if (rcode >= 0) {
210  void *rctx = entry->rctx;
211 
212  talloc_free(entry);
213  if (uq->resume) uq->resume(true, rctx);
214  continue;
215  }
216 
217  if (rcode < 0) {
218  if (errno == EINTR) {
219  if (retries++ < 3) goto retry;
220  return;
221  }
222 
223 #if EWOULDBLOCK != EAGAIN
224  if (!((errno == EWOULDBLOCK) || (errno == EAGAIN))) return;
225 #else
226  if (errno != EWOULDBLOCK) return;
227 #endif
228  }
229  }}
230 
231  /*
232  * Nothing more to write, delete the IO handler so that we don't get extraneous signals.
233  */
234  if (fr_dlist_num_elements(&uq->queue) == 0) {
235  fr_event_fd_delete(uq->el, uq->fd, FR_EVENT_FILTER_IO);
236  uq->blocked = false;
237  }
238 }
239 
240 /** Write packet to socket, OR enqueue it if we get EAGAIN
241  *
242  * In most cases, the packet will get written to the socket immediately.
243  *
244  * However, if the socket is blocked, then the packet is added to an
245  * outbound queue. When the socket becomes unblocked, the packets
246  * will be sent.
247  *
248  * @param ctx the talloc context for this packet to be saved in, usually request_t
249  * @param uq the local queue to write it to
250  * @param packet the packet to write
251  * @param packet_len how long the packet is
252  * @param ipaddr the IP address we're sending the packet to
253  * @param port the port we're sending the packet to
254  * @param rctx for resumption, usually request_t, or a structure which holds a request_t
255  * @return
256  * - <0 for error
257  * - 0 for "didn't write it to socket, but added it to the queue, and the caller should yield"
258  * - 1 for "wrote it to the socket, you're good to go".
259  */
260 int fr_udp_queue_write(TALLOC_CTX *ctx, fr_udp_queue_t *uq, uint8_t const *packet, size_t packet_len,
261  fr_ipaddr_t const *ipaddr, int port, void *rctx)
262 {
263  struct sockaddr_storage sockaddr;
264  socklen_t socklen;
265  fr_udp_queue_entry_t *entry;
266 
267  fr_ipaddr_to_sockaddr(&sockaddr, &socklen, ipaddr, port);
268 
269  if (!packet_len || !port) return 1;
270 
271  if (!uq->blocked) {
272  int retries = 0;
273  ssize_t rcode;
274 
275 retry:
276  rcode = sendto(uq->fd, packet, packet_len, 0, (struct sockaddr *) &sockaddr, socklen);
277  if (rcode >= 0) return 1;
278 
279  if (rcode < 0) {
280  if (errno == EINTR) {
281  if (retries++ < 3) goto retry;
282  return -1;
283  }
284 
285 #if EWOULDBLOCK != EAGAIN
286  if (!((errno == EWOULDBLOCK) || (errno == EAGAIN))) return -1;
287 #else
288  if (errno != EWOULDBLOCK) return -1;
289 #endif
290  }
291 
292  /*
293  */
294  if (fr_event_fd_insert(uq, NULL, uq->el, uq->fd, NULL,
295  udp_queue_writable, NULL, uq) < 0) {
296  return -1;
297  }
298 
299  uq->blocked = true;
300  }
301 
302  /*
303  * Limit the number of packets in the queue.
304  */
305  if (uq->config->max_queued_packets &&
307  return -1;
308  }
309 
310  entry = (fr_udp_queue_entry_t *) talloc_zero_array(ctx, uint8_t, sizeof(fr_udp_queue_entry_t) + packet_len);
311  if (!entry) return -1;
312 
313  talloc_set_type(entry, fr_udp_queue_entry_t);
314  talloc_set_destructor(entry, _udp_queue_entry_free);
315 
316  *entry = (fr_udp_queue_entry_t) {
317  .sockaddr = sockaddr,
318  .socklen = socklen,
319  .uq = uq,
320  .expires = fr_time_add(fr_time(), uq->config->max_queued_time),
321  .rctx = rctx,
322  .packet_len = packet_len,
323  };
324 
325  memcpy(entry->packet, packet, packet_len);
326  fr_dlist_insert_tail(&uq->queue, entry);
327 
328  /*
329  * Didn't do anything, say so.
330  */
331 
332  return 0;
333 }
#define RCSID(id)
Definition: build.h:444
#define UNUSED
Definition: build.h:313
static int retries
Definition: dhcpclient.c:53
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition: dlist.h:260
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition: dlist.h:939
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition: dlist.h:378
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition: dlist.h:638
#define fr_dlist_foreach_safe(_list_head, _type, _iter)
Iterate over the contents of a list allowing for removals.
Definition: dlist.h:108
Head of a doubly linked list.
Definition: dlist.h:51
Entry in a doubly linked list.
Definition: dlist.h:41
#define fr_event_fd_insert(...)
Definition: event.h:232
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition: event.h:62
int fr_ipaddr_to_sockaddr(struct sockaddr_storage *sa, socklen_t *salen, fr_ipaddr_t const *ipaddr, uint16_t port)
Convert our internal ip address representation to a sockaddr.
Definition: inet.c:1378
IPv4/6 prefix.
Definition: merged_model.c:272
talloc_free(reap)
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition: event.c:1253
Stores all information relating to an event list.
Definition: event.c:411
unsigned short uint16_t
Definition: merged_model.c:31
long int ssize_t
Definition: merged_model.c:24
unsigned char uint8_t
Definition: merged_model.c:30
static const conf_parser_t config[]
Definition: base.c:188
int fr_socket_server_udp(fr_ipaddr_t const *src_ipaddr, uint16_t *src_port, char const *port_name, bool async)
Open an IPv4/IPv6 unconnected UDP socket.
Definition: socket.c:867
int fr_socket_bind(int sockfd, char const *ifname, fr_ipaddr_t *src_ipaddr, uint16_t *src_port)
Bind a UDP/TCP v4/v6 socket to a given ipaddr src port, and interface.
Definition: socket.c:229
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition: syserror.c:243
#define fr_time_gteq(_a, _b)
Definition: time.h:238
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition: time.h:196
"server local" time.
Definition: time.h:69
static void udp_queue_writable(UNUSED fr_event_list_t *el, UNUSED int fd, UNUSED int flags, void *uctx)
If the socket is writable, then flush packets until either it returns EWOULDBLOCK,...
Definition: udp_queue.c:185
bool blocked
are we blocked?
Definition: udp_queue.c:42
int fr_udp_queue_write(TALLOC_CTX *ctx, fr_udp_queue_t *uq, uint8_t const *packet, size_t packet_len, fr_ipaddr_t const *ipaddr, int port, void *rctx)
Write packet to socket, OR enqueue it if we get EAGAIN.
Definition: udp_queue.c:260
socklen_t socklen
Definition: udp_queue.c:49
static int _udp_queue_free(fr_udp_queue_t *uq)
Definition: udp_queue.c:62
struct sockaddr_storage sockaddr
Definition: udp_queue.c:48
fr_dlist_head_t queue
list of queued packets to write, ordered by time
Definition: udp_queue.c:37
fr_udp_queue_t * fr_udp_queue_alloc(TALLOC_CTX *ctx, fr_udp_queue_config_t const *config, fr_event_list_t *el, fr_udp_queue_resume_t resume)
Allocate an outbound UDP queue.
Definition: udp_queue.c:95
uint8_t packet[]
Definition: udp_queue.c:59
fr_time_t expires
Definition: udp_queue.c:56
fr_dlist_t dlist
Definition: udp_queue.c:52
fr_udp_queue_t * uq
Definition: udp_queue.c:51
fr_event_list_t * el
Definition: udp_queue.c:39
close(uq->fd)
fr_udp_queue_resume_t resume
Definition: udp_queue.c:44
static int _udp_queue_entry_free(fr_udp_queue_entry_t *entry)
Definition: udp_queue.c:73
fr_udp_queue_config_t const * config
configuration
Definition: udp_queue.c:36
Definition: udp_queue.c:47
uint32_t max_queued_packets
maximum queued packets
Definition: udp_queue.h:44
void(* fr_udp_queue_resume_t)(bool written, void *rctx)
Definition: udp_queue.h:53
struct fr_udp_queue_s fr_udp_queue_t
Definition: udp_queue.h:51
fr_time_delta_t max_queued_time
maximum time a packet can be queued
Definition: udp_queue.h:42
static fr_event_list_t * el
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition: strerror.h:64