The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
network.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 1d1d99b810f26926ab5d52d0bc81e46e97471227 $
19 *
20 * @brief Receiver of socket data, which sends messages to the workers.
21 * @file io/network.c
22 *
23 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24 */
25RCSID("$Id: 1d1d99b810f26926ab5d52d0bc81e46e97471227 $")
26
27#define LOG_PREFIX nr->name
28
29#define LOG_DST nr->log
30
31#include <freeradius-devel/util/event.h>
32#include <freeradius-devel/util/rand.h>
33#include <freeradius-devel/util/rb.h>
34#include <freeradius-devel/util/syserror.h>
35#include <freeradius-devel/util/atexit.h>
36
37#include <freeradius-devel/io/channel.h>
38#include <freeradius-devel/io/listen.h>
39#include <freeradius-devel/io/network.h>
40#include <freeradius-devel/io/queue.h>
41
42#define MAX_WORKERS 64
43
44static _Thread_local fr_ring_buffer_t *fr_network_rb;
45
52
53/** Associate a worker thread with a network thread
54 *
55 */
56typedef struct {
57 fr_heap_index_t heap_id; //!< workers are in a heap
58 fr_time_delta_t cpu_time; //!< how much CPU time this worker has spent
59 fr_time_delta_t predicted; //!< predicted processing time for one packet
60
61 bool blocked; //!< is this worker blocked?
62
63 fr_channel_t *channel; //!< channel to the worker
64 fr_worker_t *worker; //!< worker pointer
67
68typedef struct {
69 fr_rb_node_t listen_node; //!< rbtree node for looking up by listener.
70 fr_rb_node_t num_node; //!< rbtree node for looking up by number.
71
72 fr_network_t *nr; //!< O(N) issues in talloc
73 int number; //!< unique ID
74 fr_heap_index_t heap_id; //!< for the sockets_by_num heap
75
76 fr_event_filter_t filter; //!< what type of filter it is
77
78 bool dead; //!< is it dead?
79 bool blocked; //!< is it blocked?
80
81 unsigned int outstanding; //!< number of outstanding packets sent to the worker
82 fr_listen_t *listen; //!< I/O ctx and functions.
83
84 fr_message_set_t *ms; //!< message buffers for this socket.
85 fr_channel_data_t *cd; //!< cached in case of allocation & read error
86 size_t leftover; //!< leftover data from a previous read
87 size_t written; //!< however much we did in a partial write
88
89 fr_channel_data_t *pending; //!< the currently pending partial packet
90 fr_heap_t *waiting; //!< packets waiting to be written
93
94/*
95 * We have an array of workers, so we can index the workers in
96 * O(1) time. remove the heap of "workers ordered by CPU time"
97 * when we send a packet to a worker, just update the predicted
98 * CPU time in place. when we receive a reply from a worker,
99 * just update the predicted CPU time in place.
100 *
101 * when we need to choose a worker, pick 2 at random, and then
102 * choose the one with the lowe cpu time. For background, see
103 * "Power of Two-Choices" and
104 * https://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf
105 * https://www.eecs.harvard.edu/~michaelm/postscripts/tpds2001.pdf
106 */
108 char const *name; //!< Network ID for logging.
109
110 pthread_t thread_id; //!< for self
111
112 bool suspended; //!< whether or not we're suspended.
113
114 fr_log_t const *log; //!< log destination
115 fr_log_lvl_t lvl; //!< debug log level
116
117 fr_atomic_queue_t *aq_control; //!< atomic queue for control messages sent to me
118
119 fr_control_t *control; //!< the control plane
120
121 fr_ring_buffer_t *rb; //!< ring buffer for my control-plane messages
122
123 fr_event_list_t *el; //!< our event list
124
125 fr_heap_t *replies; //!< replies from the worker, ordered by priority / origin time
126
128
129 fr_rb_tree_t *sockets; //!< list of sockets we're managing, ordered by the listener
130 fr_rb_tree_t *sockets_by_num; //!< ordered by number;
131
132 int num_workers; //!< number of active workers
133 int num_blocked; //!< number of blocked workers
134 int num_pending_workers; //!< number of workers we're waiting to start.
135 int max_workers; //!< maximum number of allowed workers
136 int num_sockets; //!< actually a counter...
137
138 int signal_pipe[2]; //!< Pipe for signalling the worker in an orderly way.
139 ///< This is more deterministic than using async signals.
140
141 bool exiting; //!< are we exiting?
142
143 fr_network_config_t config; //!< configuration
145};
146
147static void fr_network_post_event(fr_event_list_t *el, fr_time_t now, void *uctx);
148static int fr_network_pre_event(fr_time_t now, fr_time_delta_t wake, void *uctx);
150static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx);
151
152static int8_t reply_cmp(void const *one, void const *two)
153{
154 fr_channel_data_t const *a = one, *b = two;
155 int ret;
156
157 ret = CMP(a->priority, b->priority);
158 if (ret != 0) return ret;
159
160 return fr_time_cmp(a->m.when, b->m.when);
161}
162
163static int8_t waiting_cmp(void const *one, void const *two)
164{
165 fr_channel_data_t const *a = one, *b = two;
166 int ret;
167
168 ret = CMP(a->priority, b->priority);
169 if (ret != 0) return ret;
170
171 return fr_time_cmp(a->reply.request_time, b->reply.request_time);
172}
173
174static int8_t socket_listen_cmp(void const *one, void const *two)
175{
176 fr_network_socket_t const *a = one, *b = two;
177
178 return CMP(a->listen, b->listen);
179}
180
181static int8_t socket_num_cmp(void const *one, void const *two)
182{
183 fr_network_socket_t const *a = one, *b = two;
184
185 return CMP(a->number, b->number);
186}
187
188/*
189 * Explicitly cleanup the memory allocated to the ring buffer,
190 * just in case valgrind complains about it.
191 */
192static int _fr_network_rb_free(void *arg)
193{
194 return talloc_free(arg);
195}
196
197/** Initialise thread local storage
198 *
199 * @return fr_ring_buffer_t for messages
200 */
202{
204
205 rb = fr_network_rb;
206 if (rb) return rb;
207
209 if (!rb) {
210 fr_perror("Failed allocating memory for network ring buffer");
211 return NULL;
212 }
213
215
216 return rb;
217}
218
219static inline bool is_network_thread(fr_network_t const *nr)
220{
221 return (pthread_equal(pthread_self(), nr->thread_id) != 0);
222}
223
225
226/** Add a fr_listen_t to a network
227 *
228 * @param nr the network
229 * @param li the listener
230 */
232{
234
235 /*
236 * Associate the protocol dictionary with the listener, so that the decode functions can check /
237 * use it.
238 *
239 * A virtual server may start off with a "dictionary" block, and therefore define a local
240 * dictionary. So the "root" dictionary of a virtual server may not be a protocol dict.
241 */
242 fr_assert(li->server_cs != NULL);
244
245 fr_assert(li->dict != NULL);
246
247 /*
248 * Skip a bunch of work if we're already in the network thread.
249 */
250 if (is_network_thread(nr) && !li->needs_full_setup) {
251 return fr_network_listen_add_self(nr, li);
252 }
253
254 rb = fr_network_rb_init();
255 if (!rb) return -1;
256
257 return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_LISTEN, &li, sizeof(li));
258}
259
260
261/** Delete a socket from a network. MUST be called only by the listener itself!.
262 *
263 * @param nr the network
264 * @param li the listener
265 */
267{
269
271
272 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
273 if (!s) return -1;
274
276
277 return 0;
278}
279
280/** Add a "watch directory" call to a network
281 *
282 * @param nr the network
283 * @param li the listener
284 */
286{
288
289 rb = fr_network_rb_init();
290 if (!rb) return -1;
291
292 return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_DIRECTORY, &li, sizeof(li));
293}
294
295/** Add a worker to a network in a different thread
296 *
297 * @param nr the network
298 * @param worker the worker
299 */
301{
303
304 rb = fr_network_rb_init();
305 if (!rb) return -1;
306
307 (void) talloc_get_type_abort(nr, fr_network_t);
308 (void) talloc_get_type_abort(worker, fr_worker_t);
309
310 return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_WORKER, &worker, sizeof(worker));
311}
312
313static void fr_network_worker_started_callback(void *ctx, void const *data, size_t data_size, fr_time_t now);
314
315/** Add a worker to a network in the same thread
316 *
317 * @param nr the network
318 * @param worker the worker
319 */
321{
322 fr_network_worker_started_callback(nr, &worker, sizeof(worker), fr_time_wrap(0));
323}
324
325
326/** Signal the network to read from a listener
327 *
328 * @param nr the network
329 * @param li the listener to read from
330 */
332{
334
335 (void) talloc_get_type_abort(nr, fr_network_t);
337
338 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
339 if (!s) return;
340
341 /*
342 * Go read the socket.
343 */
344 fr_network_read(nr->el, s->listen->fd, 0, s);
345}
346
347
348/** Inject a packet for a listener to write
349 *
350 * @param nr the network
351 * @param li the listener where the packet is being injected
352 * @param packet the packet to be written
353 * @param packet_len the length of the packet
354 * @param packet_ctx The packet context to write
355 * @param request_time when the packet was received.
356 */
357void fr_network_listen_write(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len,
358 void *packet_ctx, fr_time_t request_time)
359{
360 fr_message_t *lm;
362
363 cd = (fr_channel_data_t) {
364 .m = (fr_message_t) {
366 .data_size = packet_len,
367 .when = request_time,
368 },
369
370 .channel = {
371 .heap_id = FR_HEAP_INDEX_INVALID,
372 },
373
374 .listen = li,
375 .priority = PRIORITY_NOW,
376 .reply.request_time = request_time,
377 };
378
379 memcpy(&cd.m.data, &packet, sizeof(packet)); /* const issues */
380 memcpy(&cd.packet_ctx, &packet_ctx, sizeof(packet_ctx)); /* const issues */
381
382 /*
383 * Localize the message and insert it into the heap of pending messages.
384 */
385 lm = fr_message_localize(nr, &cd.m, sizeof(cd));
386 if (!lm) return;
387
388 if (fr_heap_insert(&nr->replies, lm) < 0) {
389 fr_message_done(lm);
390 }
391}
392
393
394/** Inject a packet for a listener to read
395 *
396 * @param nr the network
397 * @param li the listener where the packet is being injected
398 * @param packet the packet to be injected
399 * @param packet_len the length of the packet
400 * @param recv_time when the packet was received.
401 * @return
402 * - <0 on error
403 * - 0 on success
404 */
405int fr_network_listen_inject(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, fr_time_t recv_time)
406{
407 int rcode;
409 fr_network_inject_t my_inject;
410
411 /*
412 * Can't inject to injection-less destinations.
413 */
414 if (!li->app_io->inject) {
415 fr_strerror_const("Listener cannot accept injected packet");
416 return -1;
417 }
418
419 /*
420 * Avoid a bounce through the event loop if we're being called from the network thread.
421 */
422 if (is_network_thread(nr)) {
424
425 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
426 if (!s) {
427 fr_strerror_const("Listener was not found for injected packet");
428 return -1;
429 }
430
431 /*
432 * Inject the packet. The master.c mod_read() routine will then take care of avoiding
433 * IO, and instead return the packet to the network side.
434 */
435 if (li->app_io->inject(li, packet, packet_len, recv_time) == 0) {
436 (void) fr_network_read(nr->el, li->fd, 0, s);
437 }
438
439 return 0;
440 }
441
442 rb = fr_network_rb_init();
443 if (!rb) return -1;
444
445 my_inject.listen = li;
446 MEM(my_inject.packet = talloc_memdup(NULL, packet, packet_len));
447 my_inject.packet_len = packet_len;
448 my_inject.recv_time = recv_time;
449
450 rcode = fr_control_message_send(nr->control, rb, FR_CONTROL_ID_INJECT, &my_inject, sizeof(my_inject));
451 if (rcode < 0) talloc_free(my_inject.packet);
452
453 return rcode;
454}
455
458 { 0 }
459};
460
463 { 0 }
464};
465
468 { 0 }
469};
470
473 { 0 }
474};
475
477{
480
481 if (nr->suspended) return;
482
483 for (s = fr_rb_iter_init_inorder(nr->sockets, &iter);
484 s != NULL;
485 s = fr_rb_iter_next_inorder(nr->sockets, &iter)) {
487 }
488 nr->suspended = true;
489}
490
492{
495
496 if (!nr->suspended) return;
497
498 for (s = fr_rb_iter_init_inorder(nr->sockets, &iter);
499 s != NULL;
500 s = fr_rb_iter_next_inorder(nr->sockets, &iter)) {
502 }
503 nr->suspended = false;
504}
505
506#define IALPHA (8)
507#define RTT(_old, _new) fr_time_delta_wrap((fr_time_delta_unwrap(_new) + (fr_time_delta_unwrap(_old) * (IALPHA - 1))) / IALPHA)
508
509/** Callback which handles a message being received on the network side.
510 *
511 * @param[in] ctx the network
512 * @param[in] ch the channel that the message is on.
513 * @param[in] cd the message (if any) to start with
514 */
516{
517 fr_network_t *nr = ctx;
518 fr_network_worker_t *worker;
519
520 cd->channel.ch = ch;
521
522 /*
523 * Update stats for the worker.
524 */
526 worker->stats.out++;
527 worker->cpu_time = cd->reply.cpu_time;
528 if (!fr_time_delta_ispos(worker->predicted)) {
529 worker->predicted = cd->reply.processing_time;
530 } else {
531 worker->predicted = RTT(worker->predicted, cd->reply.processing_time);
532 }
533
534 /*
535 * Unblock the worker.
536 */
537 if (worker->blocked) {
538 worker->blocked = false;
539 nr->num_blocked--;
541 }
542
543 /*
544 * Ensure that heap insert works.
545 */
546 cd->channel.heap_id = FR_HEAP_INDEX_INVALID;
547 if (fr_heap_insert(&nr->replies, cd) < 0) {
548 fr_message_done(&cd->m);
549 fr_assert(0 == 1);
550 }
551}
552
553/** Handle a network control message callback for a channel
554 *
555 * This is called from the event loop when we get a notification
556 * from the event signalling pipe.
557 *
558 * @param[in] ctx the network
559 * @param[in] data the message
560 * @param[in] data_size size of the data
561 * @param[in] now the current time
562 */
563static void fr_network_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
564{
566 fr_channel_t *ch;
567 fr_network_t *nr = ctx;
568
569 ce = fr_channel_service_message(now, &ch, data, data_size);
570 DEBUG3("Channel %s",
571 fr_table_str_by_value(channel_signals, ce, "<INVALID>"));
572 switch (ce) {
573 case FR_CHANNEL_ERROR:
574 return;
575
576 case FR_CHANNEL_EMPTY:
577 return;
578
579 case FR_CHANNEL_NOOP:
580 break;
581
583 fr_assert(ch != NULL);
584 while (fr_channel_recv_reply(ch));
585 break;
586
588 fr_assert(0 == 1);
589 break;
590
591 case FR_CHANNEL_OPEN:
592 fr_assert(0 == 1);
593 break;
594
595 case FR_CHANNEL_CLOSE:
596 {
597 fr_network_worker_t *w = talloc_get_type_abort(fr_channel_requestor_uctx_get(ch),
599 int i;
600
601 /*
602 * Remove this worker from the array
603 */
604 DEBUG3("Worker acked our close request");
605 for (i = 0; i < nr->num_workers; i++) {
606 if (nr->workers[i] == w) {
607 if (i == (nr->num_workers - 1)) break;
608
609 /*
610 * Close the hole...
611 */
612 memmove(&nr->workers[i], &nr->workers[i + 1],
613 (uint8_t *) &nr->workers[nr->num_workers] - (uint8_t *) &nr->workers[i + 1]);
614 break;
615 }
616 }
617 nr->num_workers--;
618 nr->workers[nr->num_workers] = NULL; /* over-write now unused pointer */
619 }
620 break;
621 }
622}
623
624#define OUTSTANDING(_x) ((_x)->stats.in - (_x)->stats.out)
625
626/** Send a message on the "best" channel.
627 *
628 * @param nr the network
629 * @param cd the message we've received
630 */
632{
633 fr_network_worker_t *worker;
634
635 (void) talloc_get_type_abort(nr, fr_network_t);
636
637 if (!nr->num_workers) {
638 RATE_LIMIT_GLOBAL(ERROR, "Failed sending packet to worker - "
639 "No workers are available");
640 return -1;
641 }
642
643retry:
644 if (nr->num_workers == 1) {
645 worker = nr->workers[0];
646 if (worker->blocked) {
647 RATE_LIMIT_GLOBAL(ERROR, "Failed sending packet to worker - "
648 "In single-threaded mode and worker is blocked");
649 drop:
650 worker->stats.dropped++;
651 return -1;
652 }
653
654 } else if (nr->num_blocked == 0) {
655 int64_t cmp;
656 uint32_t one, two;
657
658 one = fr_rand() % nr->num_workers;
659 do {
660 two = fr_rand() % nr->num_workers;
661 } while (two == one);
662
663 /*
664 * Choose a worker based on minimizing the amount
665 * of future work it's being asked to do.
666 *
667 * If both workers have the same number of
668 * outstanding requests, then choose the worker
669 * which has used the least total CPU time.
670 */
671 cmp = (OUTSTANDING(nr->workers[one]) - OUTSTANDING(nr->workers[two]));
672 if (cmp < 0) {
673 worker = nr->workers[one];
674
675 } else if (cmp > 0) {
676 worker = nr->workers[two];
677
678 } else if (fr_time_delta_lt(nr->workers[one]->cpu_time, nr->workers[two]->cpu_time)) {
679 worker = nr->workers[one];
680
681 } else {
682 worker = nr->workers[two];
683 }
684 } else {
685 int i;
686 uint64_t min_outstanding = UINT64_MAX;
687 fr_network_worker_t *found = NULL;
688
689 /*
690 * Some workers are blocked. Pick the worker
691 * with the least amount of future work to do.
692 */
693 for (i = 0; i < nr->num_workers; i++) {
694 uint64_t outstanding;
695
696 worker = nr->workers[i];
697 if (worker->blocked) continue;
698
699 outstanding = OUTSTANDING(worker);
700 if ((outstanding < min_outstanding) || !found) {
701 found = worker;
702 min_outstanding = outstanding;
703
704 } else if (outstanding == min_outstanding) {
705 /*
706 * Queue lengths are the same.
707 * Choose this worker if it's
708 * less busy than the previous one we found.
709 */
710 if (fr_time_delta_lt(worker->cpu_time, found->cpu_time)) {
711 found = worker;
712 }
713 }
714 }
715
716 if (!found) {
717 RATE_LIMIT_GLOBAL(PERROR, "Failed sending packet to worker - Couldn't find active worker, "
718 "%u/%u workers are blocked", nr->num_blocked, nr->num_workers);
719 return -1;
720 }
721
722 worker = found;
723 }
724
725 (void) talloc_get_type_abort(worker, fr_network_worker_t);
726
727 /*
728 * Too many outstanding packets for this worker. Drop
729 * the request.
730 *
731 * If the worker we've picked has too many outstanding
732 * packets, then we have either only one worker, in which
733 * cae we should drop the packet. Or, we were unable to
734 * find a worker with smaller than max_outstanding
735 * packets. In which case all of the workers are likely
736 * at max_outstanding.
737 *
738 * In both cases, we should just drop the new packet.
739 */
740 fr_assert(worker->stats.in >= worker->stats.out);
741 if (nr->config.max_outstanding &&
742 (OUTSTANDING(worker) >= nr->config.max_outstanding)) {
743 RATE_LIMIT_GLOBAL(PERROR, "max_outstanding reached - dropping packet");
744 goto drop;
745 }
746
747 /*
748 * Send the message to the channel. If we fail, drop the
749 * packet. The only reason for failure is that the
750 * worker isn't servicing it's input queue. When that
751 * happens, we have no idea what to do, and the whole
752 * thing falls over.
753 */
754 if (fr_channel_send_request(worker->channel, cd) < 0) {
755 worker->stats.dropped++;
756 worker->blocked = true;
757 nr->num_blocked++;
758
759 RATE_LIMIT_GLOBAL(PERROR, "Failed sending packet to worker - %u/%u workers are blocked",
760 nr->num_blocked, nr->num_workers);
761
762 if (nr->num_blocked == nr->num_workers) {
764 return -1;
765 }
766 goto retry;
767 }
768
769 worker->stats.in++;
770
771 /*
772 * We're projecting that the worker will use more CPU
773 * time to process this request. The CPU time will be
774 * updated with a more accurate number when we receive a
775 * reply from this channel.
776 */
777 worker->cpu_time = fr_time_delta_add(worker->cpu_time, worker->predicted);
778
779 return 0;
780}
781
782
783/** Send a packet to the worker.
784 *
785 * MUST only be called from the network thread.
786 *
787 * @param nr the network
788 * @param parent the parent listener
789 * @param li the listener that the packet was "read" from. Can be "parent"
790 * @param buffer the packet to send
791 * @param buflen size of the packet to send
792 * @param recv_time of the packet
793 * @param packet_ctx for the packet
794 * @return
795 * - <0 on error
796 * - 0 on success
797 */
799 const uint8_t *buffer, size_t buflen, fr_time_t recv_time, void *packet_ctx)
800{
803
804 (void) talloc_get_type_abort(nr, fr_network_t);
806
807 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
808 if (!s) return -1;
809
811 if (!cd) return -1;
812
813 cd->listen = parent;
815 cd->packet_ctx = packet_ctx;
816 cd->request.recv_time = recv_time;
817 memcpy(cd->m.data, buffer, buflen);
818 cd->m.when = fr_time();
819
820 if (fr_network_send_request(nr, cd) < 0) {
822 fr_message_done(&cd->m);
823 nr->stats.dropped++;
824 s->stats.dropped++;
825 return -1;
826 }
827
828 s->outstanding++;
829 return 0;
830}
831
832/** Get the number of outstanding packets
833 *
834 * @param nr the network
835 * @param li the listener that the packet was "read" from
836 * @return
837 * - <0 on error
838 * - the number of outstanding packets
839*/
842
843 (void) talloc_get_type_abort(nr, fr_network_t);
845
846 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
847 if (!s) return -1;
848
849 return s->outstanding;
850}
851
852/*
853 * Mark it as dead, but DON'T free it until all of the replies
854 * have come in.
855 */
857{
858 int i;
859
860 if (s->dead) return;
861
862 s->dead = true;
863
864 /*
865 * This FD is no longer part of the event loop.
866 */
867 fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
868
869 for (i = 0; i < nr->max_workers; i++) {
870 if (!nr->workers[i]) continue;
871
872 (void) fr_worker_listen_cancel(nr->workers[i]->worker, s->listen);
873 }
874
875 /*
876 * If there are no outstanding packets, then we can free
877 * it now.
878 */
879 if (!s->outstanding) {
880 talloc_free(s);
881 return;
882 }
883
884 /*
885 * There are still outstanding packets. Leave it in the
886 * socket tree, so that replies from the worker can find
887 * it. When we've received all of the replies, then
888 * fr_network_post_event() will clean up this socket.
889 */
890}
891
892/** Read a packet from the network.
893 *
894 * @param[in] el the event list.
895 * @param[in] sockfd the socket which is ready to read.
896 * @param[in] flags from kevent.
897 * @param[in] ctx the network socket context.
898 */
899static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx)
900{
901 int num_messages = 0;
902 fr_network_socket_t *s = ctx;
903 fr_network_t *nr = s->nr;
904 ssize_t data_size;
905 fr_channel_data_t *cd, *next;
906
907 if (!fr_cond_assert_msg(s->listen->fd == sockfd, "Expected listen->fd (%u) to be equal event fd (%u)",
908 s->listen->fd, sockfd)) return;
909
910 DEBUG3("Reading data from FD %u", sockfd);
911
912 if (!s->cd) {
914 if (!cd) {
915 ERROR("Failed allocating message size %zd! - Closing socket",
918 return;
919 }
920 } else {
921 cd = s->cd;
922 }
923
924 fr_assert(cd->m.data != NULL);
925
926next_message:
927 /*
928 * Poll this socket, but not too often. We have to go
929 * service other sockets, too.
930 */
931 if (num_messages > 16) {
932 s->cd = cd;
933 return;
934 }
935
937
938 /*
939 * Read data from the network.
940 *
941 * Return of 0 means "no data", which is fine for UDP.
942 * For TCP, if an underlying read() on the TCP socket
943 * returns 0, (which signals that the FD is no longer
944 * usable) this function should return -1, so that the
945 * network side knows that it needs to close the
946 * connection.
947 */
948 data_size = s->listen->app_io->read(s->listen, &cd->packet_ctx, &cd->request.recv_time,
949 cd->m.data, cd->m.rb_size, &s->leftover);
950 if (data_size == 0) {
951 /*
952 * Cache the message only when there are leftover
953 * bytes from a partial stream read. The buffer
954 * must be handed back on the next call so the
955 * stream can append to it.
956 *
957 * When app_io->read() calls fr_network_listen_send_packet()
958 * internally, that function calls fr_message_and_data_alloc()
959 * on the same message set. Because the message ring uses
960 * fr_ring_buffer_reserve() (which does not advance write_offset),
961 * the new alloc returns the same ring slot as our reservation.
962 * The memset inside message_reserve zeroes our struct, then the
963 * new message commits into that slot. We can detect this
964 * because cd->m.data_size is non-zero after the alloc commits.
965 *
966 * In the aliased case, cd now refers to the already-committed
967 * message that was dispatched to a worker. We must not reset
968 * or modify it; just clear s->cd so the next call reserves fresh.
969 *
970 * When neither leftover nor aliasing applies, the reservation
971 * is clean and unused; reset it explicitly so its ring-buffer
972 * slot is immediately reusable.
973 */
974 if (s->leftover) {
975 s->cd = cd;
976 } else if (cd->m.data_size != 0) {
977 s->cd = NULL;
978 } else {
980 s->cd = NULL;
981 }
982 return;
983 }
984
985 /*
986 * Error: close the connection, and remove the fr_listen_t
987 */
988 if (data_size < 0) {
989// fr_log(nr->log, L_DBG_ERR, "error from transport read on socket %d", sockfd);
991 return;
992 }
993 s->cd = NULL;
994
995 DEBUG3("Read %zd byte(s) from FD %u", data_size, sockfd);
996 if (s->listen->read_hexdump) HEXDUMP2(cd->m.data, data_size, "%s read ", s->listen->name);
997 nr->stats.in++;
998 s->stats.in++;
999
1000 /*
1001 * Initialize the rest of the fields of the channel data.
1002 *
1003 * We always use "now" as the time of the message, as the
1004 * packet MAY be a duplicate packet magically resurrected
1005 * from the past. i.e. If the read routines are doing
1006 * dedup, then they notice that the packet is a
1007 * duplicate. In that case, they send over a copy of the
1008 * packet, BUT with the original timestamp. This
1009 * information tells the worker that the packet is a
1010 * duplicate.
1011 */
1012 cd->m.when = fr_time();
1013 cd->listen = s->listen;
1014
1015 /*
1016 * Nothing in the buffer yet. Allocate room for one
1017 * packet.
1018 */
1019 if ((cd->m.data_size == 0) && (!s->leftover)) {
1020 (void) fr_message_and_data_commit(s->ms, &cd->m, data_size);
1021 next = NULL;
1022 } else {
1023 /*
1024 * There are leftover bytes in the buffer, feed
1025 * them to the next round of reading.
1026 */
1027 if (s->leftover) {
1028 next = (fr_channel_data_t *) fr_message_and_data_commit_with_leftover(s->ms, &cd->m, data_size, s->leftover,
1030 if (!next) {
1031 PERROR("Failed reserving partial packet.");
1032 // @todo - probably close the socket...
1033 fr_assert(0 == 1);
1034 }
1035 } else {
1036 (void) fr_message_and_data_commit(s->ms, &cd->m, data_size);
1037 next = NULL;
1038 }
1039 }
1040
1041 /*
1042 * Set the priority. Which incidentally also checks if
1043 * we're allowed to read this particular kind of packet.
1044 *
1045 * That check is because the app_io handlers just read
1046 * packets, and don't really have access to the parent
1047 * "list of allowed packet types". So we have to do the
1048 * work here in a callback.
1049 *
1050 * That should probably be fixed...
1051 */
1052 if (s->listen->app->priority) {
1053 int priority;
1054
1055 priority = s->listen->app->priority(s->listen->app_instance, cd->m.data, data_size);
1056 if (priority <= 0) goto discard;
1057
1058 cd->priority = priority;
1059 }
1060
1061 if (fr_network_send_request(nr, cd) < 0) {
1062 discard:
1063 talloc_free(cd->packet_ctx); /* not sure what else to do here */
1064 fr_message_done(&cd->m);
1065 nr->stats.dropped++;
1066 s->stats.dropped++;
1067
1068 } else {
1069 /*
1070 * One more packet sent to a worker.
1071 */
1072 s->outstanding++;
1073 }
1074
1075 /*
1076 * If there is a next message, go read it from the buffer.
1077 *
1078 * @todo - note that this calls read(), even if the
1079 * app_io has paused the reader. We likely want to be
1080 * able to check that, too. We might just remove this
1081 * "goto"...
1082 */
1083 if (next) {
1084 cd = next;
1085 num_messages++;
1086 goto next_message;
1087 }
1088}
1089
1090int fr_network_sendto_worker(fr_network_t *nr, fr_listen_t *li, void *packet_ctx, uint8_t const *data, size_t data_len, fr_time_t recv_time)
1091{
1094
1095 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
1096 if (!s) return -1;
1097
1098 cd = (fr_channel_data_t *) fr_message_and_data_alloc(s->ms, data_len);
1099 if (!cd) return -1;
1100
1101 s->stats.in++;
1102
1104
1105 cd->m.when = recv_time;
1106 cd->listen = li;
1107 cd->packet_ctx = packet_ctx;
1108
1109 memcpy(cd->m.data, data, data_len);
1110
1111 if (fr_network_send_request(nr, cd) < 0) {
1112 talloc_free(packet_ctx);
1113 fr_message_done(&cd->m);
1114 nr->stats.dropped++;
1115 s->stats.dropped++;
1116 return -1;
1117 }
1118
1119 /*
1120 * One more packet sent to a worker.
1121 */
1122 s->outstanding++;
1123 return 0;
1124}
1125
1126
1127/** Get a notification that a vnode changed
1128 *
1129 * @param[in] el the event list.
1130 * @param[in] sockfd the socket which is ready to read.
1131 * @param[in] fflags from kevent.
1132 * @param[in] ctx the network socket context.
1133 */
1134static void fr_network_vnode_extend(UNUSED fr_event_list_t *el, int sockfd, int fflags, void *ctx)
1135{
1136 fr_network_socket_t *s = ctx;
1137 fr_network_t *nr = s->nr;
1138
1139 if (!fr_cond_assert(s->listen->fd == sockfd)) return;
1140
1141 DEBUG3("network vnode");
1142
1143 /*
1144 * Tell the IO handler that something has happened to the
1145 * file.
1146 */
1147 s->listen->app_io->vnode(s->listen, fflags);
1148}
1149
1150
1151/** Handle errors for a socket.
1152 *
1153 * @param[in] el the event list
1154 * @param[in] sockfd the socket which has a fatal error.
1155 * @param[in] flags returned by kevent.
1156 * @param[in] fd_errno returned by kevent.
1157 * @param[in] ctx the network socket context.
1158 */
1160 int fd_errno, void *ctx)
1161{
1162 fr_network_socket_t *s = ctx;
1163 fr_network_t *nr = s->nr;
1164
1165 if (s->listen->app_io->error) {
1166 s->listen->app_io->error(s->listen);
1167
1168 } else if (flags & EV_EOF) {
1169 DEBUG2("Socket %s closed by peer", s->listen->name);
1170
1171 } else {
1172 ERROR("Socket %s errored - %s", s->listen->name, fr_syserror(fd_errno));
1173 }
1174
1176}
1177
1178
1179/** Write packets to the network.
1180 *
1181 * @param el the event list
1182 * @param sockfd the socket which is ready to write
1183 * @param flags returned by kevent.
1184 * @param ctx the network socket context.
1185 */
1186static void fr_network_write(UNUSED fr_event_list_t *el, UNUSED int sockfd, UNUSED int flags, void *ctx)
1187{
1188 fr_network_socket_t *s = ctx;
1189 fr_listen_t *li = s->listen;
1190 fr_network_t *nr = s->nr;
1192
1193 (void) talloc_get_type_abort(nr, fr_network_t);
1194
1195 /*
1196 * Start with the currently pending message, and then
1197 * work through the priority heap.
1198 */
1199 if (s->pending) {
1200 cd = s->pending;
1201 s->pending = NULL;
1202
1203 } else {
1204 cd = fr_heap_pop(&s->waiting);
1205 }
1206
1207 while (cd != NULL) {
1208 int rcode;
1209
1210 fr_assert(li == cd->listen);
1211 if (li->write_hexdump) HEXDUMP2(cd->m.data, cd->m.data_size, "%s writing ", li->name);
1212 rcode = li->app_io->write(li, cd->packet_ctx,
1213 cd->reply.request_time,
1214 cd->m.data, cd->m.data_size, s->written);
1215
1216 /*
1217 * Write of 0 bytes means an OS bug, and we just discard this packet.
1218 */
1219 if (rcode == 0) {
1220 RATE_LIMIT_GLOBAL(ERROR, "Discarding packet due to write returning zero for socket %s",
1221 s->listen->name);
1222 goto discard;
1223 }
1224
1225 /*
1226 * Or we have a write error.
1227 */
1228 if (rcode < 0) {
1229 /*
1230 * Stop processing the heap, and set the
1231 * pending message to the current one.
1232 */
1233 if (errno == EWOULDBLOCK) {
1234 save_pending:
1235 fr_assert(!s->pending);
1236
1237 if (cd->m.status != FR_MESSAGE_LOCALIZED) {
1238 fr_message_t *lm;
1239
1240 lm = fr_message_localize(s, &cd->m, sizeof(*cd));
1241 if (!lm) {
1242 ERROR("Failed saving pending packet");
1243 goto dead;
1244 }
1245
1246 cd = (fr_channel_data_t *) lm;
1247 }
1248
1249 if (!s->blocked) {
1251 PERROR("Failed adding write callback to event loop");
1252 goto dead;
1253 }
1254
1255 s->blocked = true;
1256 }
1257
1258 s->pending = cd;
1259 return;
1260 }
1261
1262 PERROR("Failed writing to socket %s", s->listen->name);
1263
1264 switch (errno) {
1265 /*
1266 * As a special hack, check for something
1267 * that will never be returned from a
1268 * real write() routine. Which then
1269 * signals to us that we have to close
1270 * the socket, but NOT complain about it.
1271 */
1272 case ECONNREFUSED:
1273 case ECONNRESET:
1274 break;
1275
1276 /*
1277 * These are temporary errors. We just discard the data.
1278 */
1279 case ENETDOWN:
1280 case ENETUNREACH:
1281 if (li->app_io->error) li->app_io->error(li);
1282
1283 fr_message_done(&cd->m);
1284 return;
1285
1286 default:
1287 if (li->app_io->error) li->app_io->error(li);
1288 break;
1289 }
1290
1291 dead:
1292 fr_message_done(&cd->m);
1294 return;
1295 }
1296
1297 /*
1298 * If we've done a partial write, localize the message and continue.
1299 */
1300 if ((size_t) rcode < cd->m.data_size) {
1301 s->written = rcode;
1302 goto save_pending;
1303 }
1304
1305 discard:
1306 s->written = 0;
1307
1308 /*
1309 * Reset for the next message.
1310 */
1311 fr_message_done(&cd->m);
1312 nr->stats.out++;
1313 s->stats.out++;
1314
1315 /*
1316 * Grab the net entry.
1317 */
1318 cd = fr_heap_pop(&s->waiting);
1319 }
1320
1321 /*
1322 * We've successfully written all of the packets. Remove
1323 * the write callback.
1324 */
1326 PERROR("Failed removing write callback from event loop");
1328 }
1329
1330 s->blocked = false;
1331}
1332
1334{
1335 fr_network_t *nr = s->nr;
1337
1338 fr_assert(s->outstanding == 0);
1339
1340 fr_rb_delete(nr->sockets, s);
1342
1343 if (!s->dead) fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
1344
1345 if (s->listen->app_io->close) {
1346 s->listen->app_io->close(s->listen);
1347 } else {
1348 close(s->listen->fd);
1349 }
1350
1351 if (s->pending) {
1353 s->pending = NULL;
1354 }
1355
1356 /*
1357 * Clean up any queued entries.
1358 */
1359 while ((cd = fr_heap_pop(&s->waiting)) != NULL) {
1360 fr_message_done(&cd->m);
1361 }
1362
1363 /* s->waiting is already talloc parented from s */
1364 talloc_free(s->listen);
1365
1366 return 0;
1367}
1368
1369
1370/** Handle a network control message callback for a new listener
1371 *
1372 * @param[in] ctx the network
1373 * @param[in] data the message
1374 * @param[in] data_size size of the data
1375 * @param[in] now the current time
1376 */
1377static void fr_network_listen_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1378{
1379 fr_network_t *nr = talloc_get_type_abort(ctx, fr_network_t);
1380 fr_listen_t *li;
1381
1382 fr_assert(data_size == sizeof(li));
1383
1384 if (data_size != sizeof(li)) return;
1385
1386 li = talloc_get_type_abort(*((void * const *)data), fr_listen_t);
1387
1388 (void) fr_network_listen_add_self(nr, li);
1389}
1390
1391static void fr_network_limit_ringbuffer(fr_network_socket_t *s, int *num_messages_p, size_t *size_p)
1392{
1393 int num_messages;
1394 size_t size;
1395
1396 num_messages = s->listen->num_messages;
1397 if (num_messages < 8) num_messages = 8;
1398 if (num_messages > (1 << 20)) num_messages = (1 << 20);
1399
1400 size = s->listen->default_message_size * num_messages;
1401 if (size < (1 << 17)) size = (1 << 17);
1402 if (size > (100 * 1024 * 1024)) size = (100 * 1024 * 1024);
1403
1404 *num_messages_p = num_messages;
1405 *size_p = size;
1406}
1407
1409{
1411 fr_app_io_t const *app_io;
1412 size_t size;
1413 int num_messages;
1414
1415 fr_assert(li->app_io != NULL);
1416
1417 /*
1418 * Non-socket listeners just get told about the event
1419 * list, and nothing else.
1420 */
1421 if (li->non_socket_listener) {
1422 fr_assert(li->app_io->event_list_set != NULL);
1423 fr_assert(!li->app_io->read);
1424 fr_assert(!li->app_io->write);
1425
1426 li->app_io->event_list_set(li, nr->el, nr);
1427
1428 /*
1429 * We use fr_log() here to avoid the "Network - " prefix.
1430 */
1431 fr_log(nr->log, L_DBG, __FILE__, __LINE__, "Listener %s bound to virtual server %s",
1432 li->name, cf_section_name2(li->server_cs));
1433
1434 return 0;
1435 }
1436
1437 s = talloc_zero(nr, fr_network_socket_t);
1438 fr_assert(s != NULL);
1439 talloc_steal(s, li);
1440
1441 s->nr = nr;
1442 s->listen = li;
1443 s->number = nr->num_sockets++;
1444
1445 MEM(s->waiting = fr_heap_alloc(s, waiting_cmp, fr_channel_data_t, channel.heap_id, 0));
1446
1447 talloc_set_destructor(s, _network_socket_free);
1448
1449 /*
1450 * Put reasonable limits on the ring buffer size. Then
1451 * round it up to the nearest power of 2, which is
1452 * required by the ring buffer code.
1453 */
1454 fr_network_limit_ringbuffer(s, &num_messages, &size);
1455
1456 /*
1457 * Allocate the ring buffer for messages and packets.
1458 */
1459 s->ms = fr_message_set_create(s, num_messages,
1460 sizeof(fr_channel_data_t),
1461 size, false);
1462 if (!s->ms) {
1463 PERROR("Failed creating message buffers for network IO");
1464 talloc_free(s);
1465 return -1;
1466 }
1467
1468 app_io = s->listen->app_io;
1470
1471 if (fr_event_fd_insert(nr, NULL, nr->el, s->listen->fd,
1475 s) < 0) {
1476 PERROR("Failed adding new socket to network event loop");
1477 talloc_free(s);
1478 return -1;
1479 }
1480
1481 /*
1482 * Start of with write updates being paused. We don't
1483 * care about being able to write if there's nothing to
1484 * write.
1485 */
1487
1488 /*
1489 * Add the listener before calling the app_io, so that
1490 * the app_io can find the listener which we're adding
1491 * here.
1492 */
1493 (void) fr_rb_insert(nr->sockets, s);
1494 (void) fr_rb_insert(nr->sockets_by_num, s);
1495
1496 if (app_io->event_list_set) app_io->event_list_set(s->listen, nr->el, nr);
1497
1498 /*
1499 * We use fr_log() here to avoid the "Network - " prefix.
1500 */
1501 fr_log(nr->log, L_DBG, __FILE__, __LINE__, "Listening on %s bound to virtual server %s",
1503
1504 DEBUG3("Using new socket %s with FD %d", s->listen->name, s->listen->fd);
1505
1506 return 0;
1507}
1508
1509/** Handle a network control message callback for a new "watch directory"
1510 *
1511 * @param[in] ctx the network
1512 * @param[in] data the message
1513 * @param[in] data_size size of the data
1514 * @param[in] now the current time
1515 */
1516static void fr_network_directory_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1517{
1518 int num_messages;
1519 size_t size;
1520 fr_network_t *nr = talloc_get_type_abort(ctx, fr_network_t);
1521 fr_listen_t *li = talloc_get_type_abort(*((void * const *)data), fr_listen_t);
1523 fr_app_io_t const *app_io;
1525
1526 if (!fr_cond_assert(data_size == sizeof(li))) return;
1527
1528 memcpy(&li, data, sizeof(li));
1529
1530 s = talloc_zero(nr, fr_network_socket_t);
1531 fr_assert(s != NULL);
1532 talloc_steal(s, li);
1533
1534 s->nr = nr;
1535 s->listen = li;
1536 s->number = nr->num_sockets++;
1537
1538 MEM(s->waiting = fr_heap_alloc(s, waiting_cmp, fr_channel_data_t, channel.heap_id, 0));
1539
1540 talloc_set_destructor(s, _network_socket_free);
1541
1542 /*
1543 * Allocate the ring buffer for messages and packets.
1544 */
1545 fr_network_limit_ringbuffer(s, &num_messages, &size);
1546
1547 s->ms = fr_message_set_create(s, num_messages,
1548 sizeof(fr_channel_data_t),
1549 size, false);
1550 if (!s->ms) {
1551 PERROR("Failed creating message buffers for directory IO");
1552 talloc_free(s);
1553 return;
1554 }
1555
1556 app_io = s->listen->app_io;
1557
1558 if (app_io->event_list_set) app_io->event_list_set(s->listen, nr->el, nr);
1559
1561
1562 if (fr_event_filter_insert(nr, NULL, nr->el, s->listen->fd, s->filter,
1563 &funcs,
1564 app_io->error ? fr_network_error : NULL,
1565 s) < 0) {
1566 PERROR("Failed adding directory monitor event loop");
1567 talloc_free(s);
1568 return;
1569 }
1570
1571 (void) fr_rb_insert(nr->sockets, s);
1572 (void) fr_rb_insert(nr->sockets_by_num, s);
1573
1574 DEBUG3("Using new socket with FD %d", s->listen->fd);
1575}
1576
1577/** Handle a network control message callback for a new worker
1578 *
1579 * @param[in] ctx the network
1580 * @param[in] data the message
1581 * @param[in] data_size size of the data
1582 * @param[in] now the current time
1583 */
1584static void fr_network_worker_started_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1585{
1586 int i;
1587 fr_network_t *nr = ctx;
1588 fr_worker_t *worker;
1590
1591 fr_assert(data_size == sizeof(worker));
1592
1593 if (nr->num_workers >= nr->max_workers) {
1594 ERROR("Too many workers");
1595 return;
1596 }
1597
1598 memcpy(&worker, data, data_size);
1599 (void) talloc_get_type_abort(worker, fr_worker_t);
1600
1601 MEM(w = talloc_zero(nr, fr_network_worker_t));
1602
1603 w->worker = worker;
1604 w->channel = fr_worker_channel_create(worker, w, nr->control);
1606 fr_fatal_assert_msg(w->channel, "Failed creating new channel");
1607
1610
1611 /*
1612 * Insert the worker into the array of workers.
1613 */
1614 for (i = 0; i < nr->max_workers; i++) {
1615 if (nr->workers[i]) continue;
1616
1617 nr->workers[i] = w;
1618 nr->num_workers++;
1619 return;
1620 }
1621}
1622
1623/** Handle a network control message callback for a packet sent to a socket
1624 *
1625 * @param[in] ctx the network
1626 * @param[in] data the message
1627 * @param[in] data_size size of the data
1628 * @param[in] now the current time
1629 */
1630static void fr_network_inject_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1631{
1632 fr_network_t *nr = ctx;
1633 fr_network_inject_t my_inject;
1635
1636 fr_assert(data_size == sizeof(my_inject));
1637
1638 memcpy(&my_inject, data, data_size);
1639 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = my_inject.listen });
1640 if (!s) {
1641 talloc_free(my_inject.packet); /* MUST be it's own TALLOC_CTX */
1642 return;
1643 }
1644
1645 /*
1646 * Inject the packet, and then read it back from the
1647 * network.
1648 */
1649 if (s->listen->app_io->inject(s->listen, my_inject.packet, my_inject.packet_len, my_inject.recv_time) == 0) {
1650 fr_network_read(nr->el, s->listen->fd, 0, s);
1651 }
1652
1653 talloc_free(my_inject.packet);
1654}
1655
1656/** Run the event loop 'pre' callback
1657 *
1658 * This function MUST DO NO WORK. All it does is check if there's
1659 * work, and tell the event code to return to the main loop if
1660 * there's work to do.
1661 *
1662 * @param[in] now the current time.
1663 * @param[in] wake the time when the event loop will wake up.
1664 * @param[in] uctx the network
1665 */
1667{
1668 fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1669
1670 if (fr_heap_num_elements(nr->replies) > 0) return 1;
1671
1672 return 0;
1673}
1674
1675/** Handle replies after all FD and timer events have been serviced
1676 *
1677 * @param el the event loop
1678 * @param now the current time (mostly)
1679 * @param uctx the fr_network_t
1680 */
1682{
1684 fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1685
1686 /*
1687 * Pull the replies off of our global heap, and try to
1688 * push them to the individual sockets.
1689 */
1690 while ((cd = fr_heap_pop(&nr->replies)) != NULL) {
1691 fr_listen_t *li;
1693
1694 li = cd->listen;
1695
1696 /*
1697 * @todo - cache this somewhere so we don't need
1698 * to do an rbtree lookup for every packet.
1699 */
1700 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
1701
1702 /*
1703 * This shouldn't happen, but be safe...
1704 */
1705 if (!s) {
1706 fr_message_done(&cd->m);
1707 continue;
1708 }
1709
1710 if (cd->m.status != FR_MESSAGE_LOCALIZED) {
1711 fr_assert(s->outstanding > 0);
1712 s->outstanding--;
1713 }
1714
1715 /*
1716 * Just mark the message done, and skip it.
1717 */
1718 if (s->dead) {
1719 fr_message_done(&cd->m);
1720
1721 /*
1722 * No more packets, it's safe to delete
1723 * the socket.
1724 */
1725 if (!s->outstanding) talloc_free(s);
1726
1727 continue;
1728 }
1729
1730 /*
1731 * No data to write to the socket, so we skip the message.
1732 */
1733 if (!cd->m.data_size) {
1734 fr_message_done(&cd->m);
1735 continue;
1736 }
1737
1738 (void) fr_heap_insert(&s->waiting, cd);
1739
1740 /*
1741 * No pending message, write it. If there is a pending write, the message will be left
1742 * in the waiting queue.
1743 */
1744 if (!s->pending) {
1745 fr_assert(!s->blocked);
1746 fr_network_write(nr->el, s->listen->fd, 0, s);
1747 }
1748 }
1749}
1750
1751/** Stop a network thread in an orderly way
1752 *
1753 * @param[in] nr the network to stop
1754 */
1756{
1758
1759 (void) talloc_get_type_abort(nr, fr_network_t);
1760
1761 /*
1762 * Close the network sockets
1763 */
1764 {
1765 fr_network_socket_t **sockets;
1766 size_t len;
1767 size_t i;
1768
1769 if (fr_rb_flatten_inorder(nr, (void ***)&sockets, nr->sockets) < 0) return -1;
1770 len = talloc_array_length(sockets);
1771
1772 for (i = 0; i < len; i++) {
1773 /*
1774 * Force to zero so we don't trigger asserts
1775 * if packets are being processed and the
1776 * server exits.
1777 */
1778 sockets[i]->outstanding = 0;
1779 talloc_free(sockets[i]);
1780 }
1781
1782 talloc_free(sockets);
1783 }
1784
1785
1786 /*
1787 * Clean up all outstanding replies.
1788 *
1789 * We can't do this after signalling the
1790 * workers to close, because they free
1791 * their message sets, and we end up
1792 * getting random use-after-free errors
1793 * as there's a race between the network
1794 * popping replies, and the workers
1795 * freeing their message sets.
1796 *
1797 * This isn't perfect, and we might still
1798 * lose some replies, but it's good enough
1799 * for now.
1800 *
1801 * @todo - call transport "done" for the reply, so that
1802 * it knows the replies are done, too.
1803 */
1804 while ((cd = fr_heap_pop(&nr->replies)) != NULL) {
1805 fr_message_done(&cd->m);
1806 }
1807
1808 /*
1809 * Signal the workers that we're closing
1810 *
1811 * nr->num_workers is decremented every
1812 * time a worker closes a socket.
1813 *
1814 * When nr->num_workers == 0, the event
1815 * loop (fr_network()) will exit.
1816 */
1817 {
1818 int i;
1819
1820 for (i = 0; i < nr->num_workers; i++) {
1821 fr_network_worker_t *worker = nr->workers[i];
1822
1824 }
1825 }
1826
1829 nr->exiting = true;
1831
1832 return 0;
1833}
1834
1835/** Read handler for signal pipe
1836 *
1837 */
1838static void _signal_pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
1839{
1840 fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1841 uint8_t buff;
1842
1843 if (read(fd, &buff, sizeof(buff)) < 0) {
1844 ERROR("Failed reading signal - %s", fr_syserror(errno));
1845 return;
1846 }
1847
1848 fr_assert(buff == 1);
1849
1850 /*
1851 * fr_network_stop() will signal the workers
1852 * to exit (by closing their channels).
1853 *
1854 * When we get the ack, we decrement our
1855 * nr->num_workers counter.
1856 *
1857 * When the counter reaches 0, the event loop
1858 * exits.
1859 */
1860 DEBUG2("Signalled to exit");
1861
1862 if (unlikely(fr_network_destroy(nr) < 0)) {
1863 PERROR("Failed stopping network");
1864 }
1865}
1866
1867/** The main network worker function.
1868 *
1869 * @param[in] nr the network data structure to run.
1870 */
1872{
1873 /*
1874 * Run until we're told to exit AND the number of
1875 * workers has dropped to zero.
1876 *
1877 * This is important as if we exit too early we
1878 * free the channels out from underneath the
1879 * workers and they read uninitialised memory.
1880 *
1881 * Whenever a worker ACKs our close notification
1882 * nr->num_workers is decremented, so when
1883 * nr->num_workers == 0, all workers have ACKd
1884 * our close and are no longer using the channel.
1885 */
1886 while (likely(!(nr->exiting && (nr->num_workers == 0)))) {
1887 bool wait_for_event;
1888 int num_events;
1889
1890 /*
1891 * There are runnable requests. We still service
1892 * the event loop, but we don't wait for events.
1893 */
1894 wait_for_event = (fr_heap_num_elements(nr->replies) == 0);
1895
1896 /*
1897 * Check the event list. If there's an error
1898 * (e.g. exit), we stop looping and clean up.
1899 */
1900 DEBUG4("Gathering events - %s", wait_for_event ? "will wait" : "Will not wait");
1901 num_events = fr_event_corral(nr->el, fr_time(), wait_for_event);
1902 DEBUG4("%u event(s) pending%s",
1903 num_events == -1 ? 0 : num_events, num_events == -1 ? " - event loop exiting" : "");
1904 if (num_events < 0) break;
1905
1906 /*
1907 * Service outstanding events.
1908 */
1909 if (num_events > 0) {
1910 DEBUG4("Servicing event(s)");
1911 fr_event_service(nr->el);
1912 }
1913 }
1914 return;
1915}
1916
1917/** Signal a network thread to exit
1918 *
1919 * @note Request to exit will be processed asynchronously.
1920 *
1921 * @param[in] nr the network data structure to manage
1922 * @return
1923 * - 0 on success.
1924 * - -1 on failure.
1925 */
1927{
1928 if (write(nr->signal_pipe[1], &(uint8_t){ 0x01 }, 1) < 0) {
1929 fr_strerror_printf("Failed signalling network thread to exit - %s", fr_syserror(errno));
1930 return -1;
1931 }
1932
1933 return 0;
1934}
1935
1936/** Free any resources associated with a network thread
1937 *
1938 */
1940{
1941 if (nr->signal_pipe[0] >= 0) close(nr->signal_pipe[0]);
1942 if (nr->signal_pipe[1] >= 0) close(nr->signal_pipe[1]);
1943
1944 return 0;
1945}
1946
1947/** Create a network
1948 *
1949 * @param[in] ctx The talloc ctx
1950 * @param[in] el The event list
1951 * @param[in] name Networker identifier.
1952 * @param[in] logger The destination for all logging messages
1953 * @param[in] lvl Log level
1954 * @param[in] config configuration structure.
1955 * @return
1956 * - NULL on error
1957 * - fr_network_t on success
1958 */
1959fr_network_t *fr_network_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name,
1960 fr_log_t const *logger, fr_log_lvl_t lvl,
1962{
1963 fr_network_t *nr;
1964
1965 nr = talloc_zero(ctx, fr_network_t);
1966 if (!nr) {
1967 fr_strerror_const("Failed allocating memory");
1968 return NULL;
1969 }
1970 talloc_set_destructor(nr, _fr_network_free);
1971
1972 nr->name = talloc_strdup(nr, name);
1973
1974 nr->thread_id = pthread_self();
1975 nr->el = el;
1976 nr->log = logger;
1977 nr->lvl = lvl;
1978
1980 nr->num_workers = 0;
1981 nr->signal_pipe[0] = -1;
1982 nr->signal_pipe[1] = -1;
1983 if (config) nr->config = *config;
1984
1985 nr->aq_control = fr_atomic_queue_talloc(nr, 1024);
1986 if (!nr->aq_control) {
1987 talloc_free(nr);
1988 return NULL;
1989 }
1990
1991 nr->control = fr_control_create(nr, el, nr->aq_control, 6);
1992 if (!nr->control) {
1993 fr_strerror_const_push("Failed creating control queue");
1994 fail:
1995 talloc_free(nr);
1996 return NULL;
1997 }
1998
1999 /*
2000 * @todo - rely on thread-local variables. And then the
2001 * various users of this can check if (rb == nr->rb), and
2002 * if so, skip the whole control plane / kevent /
2003 * whatever roundabout thing.
2004 */
2006 if (!nr->rb) {
2007 fr_strerror_const_push("Failed creating ring buffer");
2008 fail2:
2009 talloc_free(nr->control);
2010 goto fail;
2011 }
2012
2014 fr_strerror_const_push("Failed adding channel callback");
2015 goto fail2;
2016 }
2017
2019 fr_strerror_const_push("Failed adding socket callback");
2020 goto fail2;
2021 }
2022
2024 fr_strerror_const_push("Failed adding socket callback");
2025 goto fail2;
2026 }
2027
2029 fr_strerror_const_push("Failed adding worker callback");
2030 goto fail2;
2031 }
2032
2034 fr_strerror_const_push("Failed adding packet injection callback");
2035 goto fail2;
2036 }
2037
2038 if (fr_control_open(nr->control) < 0) {
2039 fr_strerror_const_push("Failed opening control queue");
2040 goto fail2;
2041 }
2042
2043 /*
2044 * Create the various heaps.
2045 */
2047 if (!nr->sockets) {
2048 fr_strerror_const_push("Failed creating listen tree for sockets");
2049 goto fail2;
2050 }
2051
2053 if (!nr->sockets_by_num) {
2054 fr_strerror_const_push("Failed creating number tree for sockets");
2055 goto fail2;
2056 }
2057
2058 nr->replies = fr_heap_alloc(nr, reply_cmp, fr_channel_data_t, channel.heap_id, 0);
2059 if (!nr->replies) {
2060 fr_strerror_const_push("Failed creating heap for replies");
2061 goto fail2;
2062 }
2063
2064 if (fr_event_pre_insert(nr->el, fr_network_pre_event, nr) < 0) {
2065 fr_strerror_const("Failed adding pre-check to event list");
2066 goto fail2;
2067 }
2068
2069 if (fr_event_post_insert(nr->el, fr_network_post_event, nr) < 0) {
2070 fr_strerror_const("Failed inserting post-processing event");
2071 goto fail2;
2072 }
2073
2074 if (pipe(nr->signal_pipe) < 0) {
2075 fr_strerror_printf("Failed initialising signal pipe - %s", fr_syserror(errno));
2076 goto fail2;
2077 }
2078 if (fr_nonblock(nr->signal_pipe[0]) < 0) goto fail2;
2079 if (fr_nonblock(nr->signal_pipe[1]) < 0) goto fail2;
2080
2081 if (fr_event_fd_insert(nr, NULL, nr->el, nr->signal_pipe[0], _signal_pipe_read, NULL, NULL, nr) < 0) {
2082 fr_strerror_const("Failed inserting event for signal pipe");
2083 goto fail2;
2084 }
2085
2086 return nr;
2087}
2088
2089int fr_network_stats(fr_network_t const *nr, int num, uint64_t *stats)
2090{
2091 if (num < 0) return -1;
2092 if (num == 0) return 0;
2093
2094 stats[0] = nr->stats.in;
2095 if (num >= 2) stats[1] = nr->stats.out;
2096 if (num >= 3) stats[2] = nr->stats.dup;
2097 if (num >= 4) stats[3] = nr->stats.dropped;
2098 if (num >= 5) stats[4] = nr->num_workers;
2099
2100 if (num <= 5) return num;
2101
2102 return 5;
2103}
2104
2105void fr_network_stats_log(fr_network_t const *nr, fr_log_t const *log)
2106{
2107 int i;
2108
2109 /*
2110 * Dump all of the channel statistics.
2111 */
2112 for (i = 0; i < nr->max_workers; i++) {
2113 if (!nr->workers[i]) continue;
2114
2115 fr_channel_stats_log(nr->workers[i]->channel, log, __FILE__, __LINE__);
2116 }
2117}
2118
2119static int cmd_stats_self(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
2120{
2121 fr_network_t const *nr = ctx;
2122
2123 fprintf(fp, "count.in\t%" PRIu64 "\n", nr->stats.in);
2124 fprintf(fp, "count.out\t%" PRIu64 "\n", nr->stats.out);
2125 fprintf(fp, "count.dup\t%" PRIu64 "\n", nr->stats.dup);
2126 fprintf(fp, "count.dropped\t%" PRIu64 "\n", nr->stats.dropped);
2127 fprintf(fp, "count.sockets\t%u\n", fr_rb_num_elements(nr->sockets));
2128
2129 return 0;
2130}
2131
2132static int cmd_socket_list(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
2133{
2134 fr_network_t const *nr = ctx;
2137
2138 // @todo - note that this isn't thread-safe!
2139
2140 for (s = fr_rb_iter_init_inorder(nr->sockets, &iter);
2141 s != NULL;
2142 s = fr_rb_iter_next_inorder(nr->sockets, &iter)) {
2143 if (!s->listen->app_io->get_name) {
2144 fprintf(fp, "%s\n", s->listen->app_io->common.name);
2145 } else {
2146 fprintf(fp, "%d\t%s\n", s->number, s->listen->app_io->get_name(s->listen));
2147 }
2148 }
2149 return 0;
2150}
2151
2152static int cmd_stats_socket(FILE *fp, FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
2153{
2154 fr_network_t const *nr = ctx;
2156
2157 s = fr_rb_find(nr->sockets_by_num, &(fr_network_socket_t){ .number = info->box[0]->vb_uint32 });
2158 if (!s) {
2159 fprintf(fp_err, "No such socket number '%s'.\n", info->argv[0]);
2160 return -1;
2161 }
2162
2163 fprintf(fp, "count.in\t%" PRIu64 "\n", s->stats.in);
2164 fprintf(fp, "count.out\t%" PRIu64 "\n", s->stats.out);
2165 fprintf(fp, "count.dup\t%" PRIu64 "\n", s->stats.dup);
2166 fprintf(fp, "count.dropped\t%" PRIu64 "\n", s->stats.dropped);
2167
2168 return 0;
2169}
2170
2171
2173 {
2174 .parent = "stats",
2175 .name = "network",
2176 .help = "Statistics for network threads.",
2177 .read_only = true
2178 },
2179
2180 {
2181 .parent = "stats network",
2182 .add_name = true,
2183 .name = "self",
2184 .func = cmd_stats_self,
2185 .help = "Show statistics for a specific network thread.",
2186 .read_only = true
2187 },
2188
2189 {
2190 .parent = "stats network",
2191 .add_name = true,
2192 .name = "socket",
2193 .syntax = "INTEGER",
2194 .func = cmd_stats_socket,
2195 .help = "Show statistics for a specific socket",
2196 .read_only = true
2197 },
2198
2199 {
2200 .parent = "show",
2201 .name = "network",
2202 .help = "Show information about network threads.",
2203 .read_only = true
2204 },
2205
2206 {
2207 .parent = "show network",
2208 .add_name = true,
2209 .name = "socket",
2210 .syntax = "list",
2211 .func = cmd_socket_list,
2212 .help = "List the sockets associated with this network thread.",
2213 .read_only = true
2214 },
2215
2217};
static int const char char buffer[256]
Definition acutest.h:576
fr_io_close_t close
Close the transport.
Definition app_io.h:60
fr_io_data_read_t read
Read from a socket to a data buffer.
Definition app_io.h:47
module_t common
Common fields to all loadable modules.
Definition app_io.h:34
fr_io_signal_t error
There was an error on the socket.
Definition app_io.h:59
fr_app_event_list_set_t event_list_set
Called by the network thread to pass an event list for use by the app_io_t.
Definition app_io.h:36
fr_io_data_inject_t inject
Inject a packet into a socket.
Definition app_io.h:50
fr_io_data_vnode_t vnode
Handle notifications that the VNODE has changed.
Definition app_io.h:52
fr_io_data_write_t write
Write from a data buffer to a socket.
Definition app_io.h:48
fr_io_name_t get_name
get the socket name
Definition app_io.h:70
Public structure describing an I/O path for a protocol.
Definition app_io.h:33
fr_app_priority_get_t priority
Assign a priority to the packet.
Definition application.h:90
#define fr_atexit_thread_local(_name, _free, _uctx)
Definition atexit.h:224
fr_atomic_queue_t * fr_atomic_queue_talloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:512
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:113
#define unlikely(_x)
Definition build.h:407
#define UNUSED
Definition build.h:336
char const * cf_section_name2(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
Definition cf_util.c:1306
void * fr_channel_requestor_uctx_get(fr_channel_t *ch)
Get network-specific data from a channel.
Definition channel.c:918
fr_table_num_sorted_t const channel_signals[]
Definition channel.c:151
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
Definition channel.c:406
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
Definition channel.c:822
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
Definition channel.c:304
int fr_channel_set_recv_reply(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_reply)
Definition channel.c:926
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition channel.c:683
void fr_channel_requestor_uctx_add(fr_channel_t *ch, void *uctx)
Add network-specific data to a channel.
Definition channel.c:906
void fr_channel_stats_log(fr_channel_t const *ch, fr_log_t const *log, char const *file, int line)
Definition channel.c:959
A full channel, which consists of two ends.
Definition channel.c:142
fr_message_t m
the message header
Definition channel.h:107
fr_channel_event_t
Definition channel.h:69
@ FR_CHANNEL_NOOP
Definition channel.h:76
@ FR_CHANNEL_EMPTY
Definition channel.h:77
@ FR_CHANNEL_CLOSE
Definition channel.h:74
@ FR_CHANNEL_ERROR
Definition channel.h:70
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition channel.h:72
@ FR_CHANNEL_OPEN
Definition channel.h:73
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition channel.h:71
void * packet_ctx
Packet specific context for holding client information, and other proto_* specific information that n...
Definition channel.h:144
fr_listen_t * listen
for tracking packet transport, etc.
Definition channel.h:148
#define FR_CONTROL_ID_CHANNEL
Definition channel.h:67
#define PRIORITY_NORMAL
Definition channel.h:153
#define PRIORITY_NOW
Definition channel.h:151
uint32_t priority
Priority of this packet.
Definition channel.h:142
Channel information which is added to a message.
Definition channel.h:106
char const * parent
e.g. "show module"
Definition command.h:52
#define CMD_TABLE_END
Definition command.h:62
char const ** argv
text version of commands
Definition command.h:42
#define FR_CONTROL_MAX_SIZE
Definition control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition control.h:50
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:131
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:148
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:176
#define MEM(x)
Definition debug.h:36
#define ERROR(fmt,...)
Definition dhcpclient.c:40
static int sockfd
Definition dhcpclient.c:55
fr_dict_t const * fr_dict_proto_dict(fr_dict_t const *dict)
Definition dict_util.c:5270
#define fr_event_fd_insert(...)
Definition event.h:247
fr_event_filter_t
The type of filter to install for an FD.
Definition event.h:82
@ FR_EVENT_FILTER_VNODE
Filter for vnode subfilters.
Definition event.h:84
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition event.h:83
#define fr_event_filter_update(...)
Definition event.h:239
#define fr_event_filter_insert(...)
Definition event.h:234
#define FR_EVENT_RESUME(_s, _f)
Re-add the filter for a func from kevent.
Definition event.h:131
#define FR_EVENT_SUSPEND(_s, _f)
Temporarily remove the filter for a func from kevent.
Definition event.h:115
fr_event_fd_cb_t extend
Additional files were added to a directory.
Definition event.h:198
Callbacks for the FR_EVENT_FILTER_IO filter.
Definition event.h:188
Structure describing a modification to a filter's state.
Definition event.h:96
Callbacks for the FR_EVENT_FILTER_VNODE filter.
Definition event.h:195
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
Definition heap.c:325
unsigned int fr_heap_index_t
Definition heap.h:80
#define fr_heap_alloc(_ctx, _cmp, _type, _field, _init)
Creates a heap that can be used with non-talloced elements.
Definition heap.h:100
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition heap.h:179
#define FR_HEAP_INDEX_INVALID
Definition heap.h:83
The main heap structure.
Definition heap.h:66
talloc_free(hp)
uint64_t out
Definition base.h:43
uint64_t dup
Definition base.h:44
uint64_t dropped
Definition base.h:45
uint64_t in
Definition base.h:42
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq, size_t num_callbacks)
Create a control-plane signaling path.
Definition control.c:152
int fr_control_callback_add(fr_control_t **c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition control.c:443
int fr_control_open(fr_control_t *c)
Open the control-plane signalling path.
Definition control.c:176
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:355
The control structure.
Definition control.c:76
bool read_hexdump
Do we debug hexdump packets as they're read.
Definition listen.h:53
size_t num_messages
for the message ring buffer
Definition listen.h:57
bool non_socket_listener
special internal listener that does not use sockets.
Definition listen.h:47
char const * name
printable name for this socket - set by open
Definition listen.h:29
void const * app_instance
Definition listen.h:39
size_t default_message_size
copied from app_io, but may be changed
Definition listen.h:56
bool write_hexdump
Do we debug hexdump packets as they're written.
Definition listen.h:54
fr_app_t const * app
Definition listen.h:38
CONF_SECTION * server_cs
CONF_SECTION of the server.
Definition listen.h:42
bool no_write_callback
sometimes we don't need to do writes
Definition listen.h:46
int fd
file descriptor for this socket - set by open
Definition listen.h:28
fr_dict_t const * dict
dictionary for this listener
Definition listen.h:30
bool needs_full_setup
Set to true to avoid the short cut when adding the listener.
Definition listen.h:48
fr_app_io_t const * app_io
I/O path functions.
Definition listen.h:32
fr_ring_buffer_t * rb
ring buffer for my control-plane messages
Definition network.c:121
fr_cmd_table_t cmd_network_table[]
Definition network.c:2172
size_t written
however much we did in a partial write
Definition network.c:87
int fr_network_listen_send_packet(fr_network_t *nr, fr_listen_t *parent, fr_listen_t *li, const uint8_t *buffer, size_t buflen, fr_time_t recv_time, void *packet_ctx)
Send a packet to the worker.
Definition network.c:798
fr_atomic_queue_t * aq_control
atomic queue for control messages sent to me
Definition network.c:117
static int cmd_stats_socket(FILE *fp, FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
Definition network.c:2152
int fr_network_listen_add(fr_network_t *nr, fr_listen_t *li)
Add a fr_listen_t to a network.
Definition network.c:231
bool suspended
whether or not we're suspended.
Definition network.c:112
int fr_network_worker_add(fr_network_t *nr, fr_worker_t *worker)
Add a worker to a network in a different thread.
Definition network.c:300
int fr_network_destroy(fr_network_t *nr)
Stop a network thread in an orderly way.
Definition network.c:1755
fr_network_t * nr
O(N) issues in talloc.
Definition network.c:72
fr_io_stats_t stats
Definition network.c:65
fr_listen_t * listen
Definition network.c:47
static void fr_network_listen_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a new listener.
Definition network.c:1377
uint8_t * packet
Definition network.c:48
static int cmd_stats_self(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
Definition network.c:2119
fr_log_t const * log
log destination
Definition network.c:114
int fr_network_directory_add(fr_network_t *nr, fr_listen_t *li)
Add a "watch directory" call to a network.
Definition network.c:285
static int _fr_network_free(fr_network_t *nr)
Free any resources associated with a network thread.
Definition network.c:1939
static void fr_network_inject_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a packet sent to a socket.
Definition network.c:1630
fr_heap_index_t heap_id
for the sockets_by_num heap
Definition network.c:74
#define RTT(_old, _new)
Definition network.c:507
void fr_network(fr_network_t *nr)
The main network worker function.
Definition network.c:1871
fr_message_set_t * ms
message buffers for this socket.
Definition network.c:84
int fr_network_listen_delete(fr_network_t *nr, fr_listen_t *li)
Delete a socket from a network.
Definition network.c:266
int num_blocked
number of blocked workers
Definition network.c:133
static void fr_network_worker_started_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
ssize_t fr_network_listen_outstanding(fr_network_t *nr, fr_listen_t *li)
Get the number of outstanding packets.
Definition network.c:840
char const * name
Network ID for logging.
Definition network.c:108
void fr_network_worker_add_self(fr_network_t *nr, fr_worker_t *worker)
Add a worker to a network in the same thread.
Definition network.c:320
unsigned int outstanding
number of outstanding packets sent to the worker
Definition network.c:81
static _Thread_local fr_ring_buffer_t * fr_network_rb
Definition network.c:44
int number
unique ID
Definition network.c:73
static fr_event_update_t const resume_write[]
Definition network.c:471
fr_time_delta_t predicted
predicted processing time for one packet
Definition network.c:59
static int fr_network_pre_event(fr_time_t now, fr_time_delta_t wake, void *uctx)
static fr_event_update_t const pause_read[]
Definition network.c:456
fr_worker_t * worker
worker pointer
Definition network.c:64
int fr_network_sendto_worker(fr_network_t *nr, fr_listen_t *li, void *packet_ctx, uint8_t const *data, size_t data_len, fr_time_t recv_time)
Definition network.c:1090
int fr_network_exit(fr_network_t *nr)
Signal a network thread to exit.
Definition network.c:1926
#define MAX_WORKERS
Definition network.c:42
int fr_network_listen_inject(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, fr_time_t recv_time)
Inject a packet for a listener to read.
Definition network.c:405
fr_listen_t * listen
I/O ctx and functions.
Definition network.c:82
int num_sockets
actually a counter...
Definition network.c:136
fr_rb_node_t listen_node
rbtree node for looking up by listener.
Definition network.c:69
static void fr_network_vnode_extend(UNUSED fr_event_list_t *el, int sockfd, int fflags, void *ctx)
Get a notification that a vnode changed.
Definition network.c:1134
static void _signal_pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Read handler for signal pipe.
Definition network.c:1838
#define OUTSTANDING(_x)
Definition network.c:624
static int8_t reply_cmp(void const *one, void const *two)
Definition network.c:152
int num_workers
number of active workers
Definition network.c:132
static int8_t socket_num_cmp(void const *one, void const *two)
Definition network.c:181
int num_pending_workers
number of workers we're waiting to start.
Definition network.c:134
fr_rb_tree_t * sockets
list of sockets we're managing, ordered by the listener
Definition network.c:129
pthread_t thread_id
for self
Definition network.c:110
fr_log_lvl_t lvl
debug log level
Definition network.c:115
int signal_pipe[2]
Pipe for signalling the worker in an orderly way.
Definition network.c:138
fr_channel_data_t * pending
the currently pending partial packet
Definition network.c:89
static int8_t socket_listen_cmp(void const *one, void const *two)
Definition network.c:174
static void fr_network_write(UNUSED fr_event_list_t *el, UNUSED int sockfd, UNUSED int flags, void *ctx)
Write packets to the network.
Definition network.c:1186
fr_event_list_t * el
our event list
Definition network.c:123
fr_heap_t * replies
replies from the worker, ordered by priority / origin time
Definition network.c:125
static int fr_network_send_request(fr_network_t *nr, fr_channel_data_t *cd)
Send a message on the "best" channel.
Definition network.c:631
static fr_event_update_t const resume_read[]
Definition network.c:461
void fr_network_stats_log(fr_network_t const *nr, fr_log_t const *log)
Definition network.c:2105
fr_heap_t * waiting
packets waiting to be written
Definition network.c:90
int fr_network_stats(fr_network_t const *nr, int num, uint64_t *stats)
Definition network.c:2089
fr_heap_index_t heap_id
workers are in a heap
Definition network.c:57
bool blocked
is this worker blocked?
Definition network.c:61
static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx)
Read a packet from the network.
Definition network.c:899
static bool is_network_thread(fr_network_t const *nr)
Definition network.c:219
fr_rb_node_t num_node
rbtree node for looking up by number.
Definition network.c:70
static void fr_network_error(UNUSED fr_event_list_t *el, UNUSED int sockfd, int flags, int fd_errno, void *ctx)
Handle errors for a socket.
Definition network.c:1159
fr_io_stats_t stats
Definition network.c:91
static fr_ring_buffer_t * fr_network_rb_init(void)
Initialise thread local storage.
Definition network.c:201
fr_channel_data_t * cd
cached in case of allocation & read error
Definition network.c:85
static int fr_network_listen_add_self(fr_network_t *nr, fr_listen_t *listen)
Definition network.c:1408
static void fr_network_suspend(fr_network_t *nr)
Definition network.c:476
bool dead
is it dead?
Definition network.c:78
size_t leftover
leftover data from a previous read
Definition network.c:86
static void fr_network_post_event(fr_event_list_t *el, fr_time_t now, void *uctx)
fr_network_worker_t * workers[MAX_WORKERS]
each worker
Definition network.c:144
fr_time_t recv_time
Definition network.c:50
static void fr_network_unsuspend(fr_network_t *nr)
Definition network.c:491
fr_rb_tree_t * sockets_by_num
ordered by number;
Definition network.c:130
fr_network_config_t config
configuration
Definition network.c:143
void fr_network_listen_read(fr_network_t *nr, fr_listen_t *li)
Signal the network to read from a listener.
Definition network.c:331
static int8_t waiting_cmp(void const *one, void const *two)
Definition network.c:163
fr_io_stats_t stats
Definition network.c:127
static void fr_network_limit_ringbuffer(fr_network_socket_t *s, int *num_messages_p, size_t *size_p)
Definition network.c:1391
static int _fr_network_rb_free(void *arg)
Definition network.c:192
static void fr_network_recv_reply(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Callback which handles a message being received on the network side.
Definition network.c:515
int max_workers
maximum number of allowed workers
Definition network.c:135
void fr_network_listen_write(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, void *packet_ctx, fr_time_t request_time)
Inject a packet for a listener to write.
Definition network.c:357
bool exiting
are we exiting?
Definition network.c:141
fr_event_filter_t filter
what type of filter it is
Definition network.c:76
static void fr_network_socket_dead(fr_network_t *nr, fr_network_socket_t *s)
Definition network.c:856
static void fr_network_directory_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a new "watch directory".
Definition network.c:1516
fr_channel_t * channel
channel to the worker
Definition network.c:63
static fr_event_update_t const pause_write[]
Definition network.c:466
fr_network_t * fr_network_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_network_config_t const *config)
Create a network.
Definition network.c:1959
static int _network_socket_free(fr_network_socket_t *s)
Definition network.c:1333
static void fr_network_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
Handle a network control message callback for a channel.
Definition network.c:563
static int cmd_socket_list(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
Definition network.c:2132
fr_time_delta_t cpu_time
how much CPU time this worker has spent
Definition network.c:58
fr_control_t * control
the control plane
Definition network.c:119
bool blocked
is it blocked?
Definition network.c:79
Associate a worker thread with a network thread.
Definition network.c:56
uint32_t max_outstanding
Definition network.h:46
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define DEBUG4(_fmt,...)
Definition log.h:267
#define HEXDUMP2(_data, _len, _fmt,...)
Definition log.h:734
#define RATE_LIMIT_GLOBAL(_log, _fmt,...)
Rate limit messages using a global limiting entry.
Definition log.h:653
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition event.c:2187
int fr_event_pre_delete(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Delete a pre-event callback from the event list.
Definition event.c:1985
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition event.c:2055
int fr_event_post_delete(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Delete a post-event callback from the event list.
Definition event.c:2033
#define fr_time()
Definition event.c:60
int fr_event_pre_insert(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Add a pre-event callback to the event list.
Definition event.c:1963
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition event.c:1203
int fr_event_post_insert(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Add a post-event callback to the event list.
Definition event.c:2010
Stores all information relating to an event list.
Definition event.c:377
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition log.c:616
fr_log_lvl_t
Definition log.h:64
@ L_DBG
Only displayed when debugging is enabled.
Definition log.h:56
unsigned int uint32_t
long int ssize_t
unsigned char uint8_t
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
Create a message set.
Definition message.c:127
void fr_message_and_data_reset(fr_message_set_t *ms, fr_message_t *m)
Cancel a reservation made by fr_message_and_data_reserve(), returning the slot to the set.
Definition message.c:1026
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:195
fr_message_t * fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
Localize a message by copying it to local storage.
Definition message.c:247
fr_message_t * fr_message_and_data_commit_with_leftover(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size, size_t leftover, size_t reserve_size)
Allocate packet data for a message, and reserve a new message.
Definition message.c:1159
fr_message_t * fr_message_and_data_commit(fr_message_set_t *ms, fr_message_t *m, size_t total_size)
Commit a previously reserved message, allocating exactly total_size bytes of packet data.
Definition message.c:1051
fr_message_t * fr_message_and_data_alloc(fr_message_set_t *ms, size_t size)
Reserve and commit a message atomically.
Definition message.c:1108
fr_message_t * fr_message_and_data_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition message.c:973
A Message set, composed of message headers and ring buffer data.
Definition message.c:94
size_t rb_size
cache-aligned size in the ring buffer
Definition message.h:51
fr_time_t when
when this message was sent
Definition message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
@ FR_MESSAGE_USED
Definition message.h:39
@ FR_MESSAGE_LOCALIZED
Definition message.h:40
fr_message_status_t status
free, used, done, etc.
Definition message.h:45
int fr_nonblock(UNUSED int fd)
Definition misc.c:293
static const conf_parser_t config[]
Definition base.c:163
#define fr_assert(_expr)
Definition rad_assert.h:37
#define DEBUG2(fmt,...)
static fr_app_io_t app_io
uint32_t fr_rand(void)
Return a 32-bit random number.
Definition rand.c:104
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
Return how many nodes there are in a tree.
Definition rb.c:781
void * fr_rb_iter_init_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Initialise an in-order iterator.
Definition rb.c:824
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
bool fr_rb_delete(fr_rb_tree_t *tree, void const *data)
Remove node and free data (if a free function was specified)
Definition rb.c:741
void * fr_rb_iter_next_inorder(UNUSED fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Return the next node.
Definition rb.c:850
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
Definition rb.h:244
int fr_rb_flatten_inorder(TALLOC_CTX *ctx, void **out[], fr_rb_tree_t *tree)
Iterator structure for in-order traversal of an rbtree.
Definition rb.h:319
The main red black tree structure.
Definition rb.h:71
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:63
static char const * name
static char buff[sizeof("18446744073709551615")+3]
Definition size_tests.c:41
Definition log.h:93
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:804
#define talloc_get_type_abort_const
Definition talloc.h:117
#define talloc_strdup(_ctx, _str)
Definition talloc.h:149
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition time.h:575
static fr_time_delta_t fr_time_delta_add(fr_time_delta_t a, fr_time_delta_t b)
Definition time.h:255
#define fr_time_delta_lt(_a, _b)
Definition time.h:285
#define fr_time_wrap(_time)
Definition time.h:145
#define fr_time_delta_ispos(_a)
Definition time.h:290
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition time.h:916
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
static fr_event_list_t * el
static fr_slen_t parent
Definition pair.h:858
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
Definition strerror.c:737
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1340
fr_dict_t const * virtual_server_dict_by_cs(CONF_SECTION const *server_cs)
Return the namespace for the virtual server specified by a config section.
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
Definition worker.c:1620
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
Definition worker.c:1647
A worker which takes packets from a master, and processes them.
Definition worker.c:90
#define FR_CONTROL_ID_INJECT
Definition worker.h:39
#define FR_CONTROL_ID_DIRECTORY
Definition worker.h:38
#define FR_CONTROL_ID_LISTEN
Definition worker.h:36
#define FR_CONTROL_ID_WORKER
Definition worker.h:37