The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
network.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: b74c422b5dfc91ac6b0eb369268217701debb328 $
19 *
20 * @brief Receiver of socket data, which sends messages to the workers.
21 * @file io/network.c
22 *
23 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24 */
25RCSID("$Id: b74c422b5dfc91ac6b0eb369268217701debb328 $")
26
27#define LOG_PREFIX nr->name
28
29#define LOG_DST nr->log
30
31#include <freeradius-devel/util/event.h>
32#include <freeradius-devel/util/misc.h>
33#include <freeradius-devel/util/rand.h>
34#include <freeradius-devel/util/rb.h>
35#include <freeradius-devel/util/syserror.h>
36#include <freeradius-devel/util/atexit.h>
37#include <freeradius-devel/util/talloc.h>
38
39#include <freeradius-devel/io/channel.h>
40#include <freeradius-devel/io/control.h>
41#include <freeradius-devel/io/listen.h>
42#include <freeradius-devel/io/network.h>
43#include <freeradius-devel/io/queue.h>
44#include <freeradius-devel/io/ring_buffer.h>
45#include <freeradius-devel/io/worker.h>
46
47#define MAX_WORKERS 64
48
49static _Thread_local fr_ring_buffer_t *fr_network_rb;
50
57
58/** Associate a worker thread with a network thread
59 *
60 */
61typedef struct {
62 fr_heap_index_t heap_id; //!< workers are in a heap
63 fr_time_delta_t cpu_time; //!< how much CPU time this worker has spent
64 fr_time_delta_t predicted; //!< predicted processing time for one packet
65
66 bool blocked; //!< is this worker blocked?
67
68 fr_channel_t *channel; //!< channel to the worker
69 fr_worker_t *worker; //!< worker pointer
72
73typedef struct {
74 fr_rb_node_t listen_node; //!< rbtree node for looking up by listener.
75 fr_rb_node_t num_node; //!< rbtree node for looking up by number.
76
77 fr_network_t *nr; //!< O(N) issues in talloc
78 int number; //!< unique ID
79 fr_heap_index_t heap_id; //!< for the sockets_by_num heap
80
81 fr_event_filter_t filter; //!< what type of filter it is
82
83 bool dead; //!< is it dead?
84 bool blocked; //!< is it blocked?
85
86 unsigned int outstanding; //!< number of outstanding packets sent to the worker
87 fr_listen_t *listen; //!< I/O ctx and functions.
88
89 fr_message_set_t *ms; //!< message buffers for this socket.
90 fr_channel_data_t *cd; //!< cached in case of allocation & read error
91 size_t leftover; //!< leftover data from a previous read
92 size_t written; //!< however much we did in a partial write
93
94 fr_channel_data_t *pending; //!< the currently pending partial packet
95 fr_heap_t *waiting; //!< packets waiting to be written
98
99/*
100 * We have an array of workers, so we can index the workers in
101 * O(1) time. remove the heap of "workers ordered by CPU time"
102 * when we send a packet to a worker, just update the predicted
103 * CPU time in place. when we receive a reply from a worker,
104 * just update the predicted CPU time in place.
105 *
106 * when we need to choose a worker, pick 2 at random, and then
107 * choose the one with the lowe cpu time. For background, see
108 * "Power of Two-Choices" and
109 * https://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf
110 * https://www.eecs.harvard.edu/~michaelm/postscripts/tpds2001.pdf
111 */
113 char const *name; //!< Network ID for logging.
114
115 pthread_t thread_id; //!< for self
116
117 bool suspended; //!< whether or not we're suspended.
118
119 fr_log_t const *log; //!< log destination
120 fr_log_lvl_t lvl; //!< debug log level
121
122 fr_atomic_queue_t *aq_control; //!< atomic queue for control messages sent to me
123
124 fr_control_t *control; //!< the control plane
125
126 fr_ring_buffer_t *rb; //!< ring buffer for my control-plane messages
127
128 fr_event_list_t *el; //!< our event list
129
130 fr_heap_t *replies; //!< replies from the worker, ordered by priority / origin time
131
133
134 fr_rb_tree_t *sockets; //!< list of sockets we're managing, ordered by the listener
135 fr_rb_tree_t *sockets_by_num; //!< ordered by number;
136
137 int num_workers; //!< number of active workers
138 int num_blocked; //!< number of blocked workers
139 int num_pending_workers; //!< number of workers we're waiting to start.
140 int max_workers; //!< maximum number of allowed workers
141 int num_sockets; //!< actually a counter...
142
143 int signal_pipe[2]; //!< Pipe for signalling the worker in an orderly way.
144 ///< This is more deterministic than using async signals.
145
146 bool exiting; //!< are we exiting?
147
148 fr_network_config_t config; //!< configuration
150};
151
152static void fr_network_post_event(fr_event_list_t *el, fr_time_t now, void *uctx);
153static int fr_network_pre_event(fr_time_t now, fr_time_delta_t wake, void *uctx);
155static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx);
156
157static int8_t reply_cmp(void const *one, void const *two)
158{
159 fr_channel_data_t const *a = one, *b = two;
160 int ret;
161
162 ret = CMP(a->priority, b->priority);
163 if (ret != 0) return ret;
164
165 return fr_time_cmp(a->m.when, b->m.when);
166}
167
168static int8_t waiting_cmp(void const *one, void const *two)
169{
170 fr_channel_data_t const *a = one, *b = two;
171 int ret;
172
173 ret = CMP(a->priority, b->priority);
174 if (ret != 0) return ret;
175
176 return fr_time_cmp(a->reply.request_time, b->reply.request_time);
177}
178
179static int8_t socket_listen_cmp(void const *one, void const *two)
180{
181 fr_network_socket_t const *a = one, *b = two;
182
183 return CMP(a->listen, b->listen);
184}
185
186static int8_t socket_num_cmp(void const *one, void const *two)
187{
188 fr_network_socket_t const *a = one, *b = two;
189
190 return CMP(a->number, b->number);
191}
192
193/*
194 * Explicitly cleanup the memory allocated to the ring buffer,
195 * just in case valgrind complains about it.
196 */
197static int _fr_network_rb_free(void *arg)
198{
199 return talloc_free(arg);
200}
201
202/** Initialise thread local storage
203 *
204 * @return fr_ring_buffer_t for messages
205 */
207{
209
211 if (rb) return rb;
212
214 if (!rb) {
215 fr_perror("Failed allocating memory for network ring buffer");
216 return NULL;
217 }
218
220
221 return rb;
222}
223
224static inline bool is_network_thread(fr_network_t const *nr)
225{
226 return (pthread_equal(pthread_self(), nr->thread_id) != 0);
227}
228
230
231/** Add a fr_listen_t to a network
232 *
233 * @param nr the network
234 * @param li the listener
235 */
237{
239
240 /*
241 * Skip a bunch of work if we're already in the network thread.
242 */
243 if (is_network_thread(nr) && !li->needs_full_setup) {
244 return fr_network_listen_add_self(nr, li);
245 }
246
248 if (!rb) return -1;
249
250 return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_LISTEN, &li, sizeof(li));
251}
252
253
254/** Delete a socket from a network. MUST be called only by the listener itself!.
255 *
256 * @param nr the network
257 * @param li the listener
258 */
260{
262
264
265 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
266 if (!s) return -1;
267
269
270 return 0;
271}
272
273/** Add a "watch directory" call to a network
274 *
275 * @param nr the network
276 * @param li the listener
277 */
279{
281
283 if (!rb) return -1;
284
285 return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_DIRECTORY, &li, sizeof(li));
286}
287
288/** Add a worker to a network in a different thread
289 *
290 * @param nr the network
291 * @param worker the worker
292 */
294{
296
298 if (!rb) return -1;
299
300 (void) talloc_get_type_abort(nr, fr_network_t);
301 (void) talloc_get_type_abort(worker, fr_worker_t);
302
303 return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_WORKER, &worker, sizeof(worker));
304}
305
306static void fr_network_worker_started_callback(void *ctx, void const *data, size_t data_size, fr_time_t now);
307
308/** Add a worker to a network in the same thread
309 *
310 * @param nr the network
311 * @param worker the worker
312 */
314{
315 fr_network_worker_started_callback(nr, &worker, sizeof(worker), fr_time_wrap(0));
316}
317
318
319/** Signal the network to read from a listener
320 *
321 * @param nr the network
322 * @param li the listener to read from
323 */
325{
327
328 (void) talloc_get_type_abort(nr, fr_network_t);
330
331 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
332 if (!s) return;
333
334 /*
335 * Go read the socket.
336 */
337 fr_network_read(nr->el, s->listen->fd, 0, s);
338}
339
340
341/** Inject a packet for a listener to write
342 *
343 * @param nr the network
344 * @param li the listener where the packet is being injected
345 * @param packet the packet to be written
346 * @param packet_len the length of the packet
347 * @param packet_ctx The packet context to write
348 * @param request_time when the packet was received.
349 */
350void fr_network_listen_write(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len,
351 void *packet_ctx, fr_time_t request_time)
352{
353 fr_message_t *lm;
355
356 cd = (fr_channel_data_t) {
357 .m = (fr_message_t) {
359 .data_size = packet_len,
360 .when = request_time,
361 },
362
363 .channel = {
364 .heap_id = FR_HEAP_INDEX_INVALID,
365 },
366
367 .listen = li,
368 .priority = PRIORITY_NOW,
369 .reply.request_time = request_time,
370 };
371
372 memcpy(&cd.m.data, &packet, sizeof(packet)); /* const issues */
373 memcpy(&cd.packet_ctx, &packet_ctx, sizeof(packet_ctx)); /* const issues */
374
375 /*
376 * Localize the message and insert it into the heap of pending messages.
377 */
378 lm = fr_message_localize(nr, &cd.m, sizeof(cd));
379 if (!lm) return;
380
381 if (fr_heap_insert(&nr->replies, lm) < 0) {
382 fr_message_done(lm);
383 }
384}
385
386
387/** Inject a packet for a listener to read
388 *
389 * @param nr the network
390 * @param li the listener where the packet is being injected
391 * @param packet the packet to be injected
392 * @param packet_len the length of the packet
393 * @param recv_time when the packet was received.
394 * @return
395 * - <0 on error
396 * - 0 on success
397 */
398int fr_network_listen_inject(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, fr_time_t recv_time)
399{
401 fr_network_inject_t my_inject;
402
403 /*
404 * Can't inject to injection-less destinations.
405 */
406 if (!li->app_io->inject) {
407 fr_strerror_const("Listener cannot accept injected packet");
408 return -1;
409 }
410
411 /*
412 * Avoid a bounce through the event loop if we're being called from the network thread.
413 */
414 if (is_network_thread(nr)) {
416
417 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
418 if (!s) {
419 fr_strerror_const("Listener was not found for injected packet");
420 return -1;
421 }
422
423 /*
424 * Inject the packet. The master.c mod_read() routine will then take care of avoiding
425 * IO, and instead return the packet to the network side.
426 */
427 if (li->app_io->inject(li, packet, packet_len, recv_time) == 0) {
428 (void) fr_network_read(nr->el, li->fd, 0, s);
429 }
430
431 return 0;
432 }
433
435 if (!rb) return -1;
436
437 my_inject.listen = li;
438 my_inject.packet = talloc_memdup(NULL, packet, packet_len);
439 my_inject.packet_len = packet_len;
440 my_inject.recv_time = recv_time;
441
442 return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_INJECT, &my_inject, sizeof(my_inject));
443}
444
446{
447 static fr_event_update_t pause_read[] = {
449 { 0 }
450 };
453
454 if (nr->suspended) return;
455
456 for (s = fr_rb_iter_init_inorder(&iter, nr->sockets);
457 s != NULL;
458 s = fr_rb_iter_next_inorder(&iter)) {
460 }
461 nr->suspended = true;
462}
463
465{
466 static fr_event_update_t resume_read[] = {
468 { 0 }
469 };
472
473 if (!nr->suspended) return;
474
475 for (s = fr_rb_iter_init_inorder(&iter, nr->sockets);
476 s != NULL;
477 s = fr_rb_iter_next_inorder(&iter)) {
479 }
480 nr->suspended = false;
481}
482
483#define IALPHA (8)
484#define RTT(_old, _new) fr_time_delta_wrap((fr_time_delta_unwrap(_new) + (fr_time_delta_unwrap(_old) * (IALPHA - 1))) / IALPHA)
485
486/** Callback which handles a message being received on the network side.
487 *
488 * @param[in] ctx the network
489 * @param[in] ch the channel that the message is on.
490 * @param[in] cd the message (if any) to start with
491 */
493{
494 fr_network_t *nr = ctx;
495 fr_network_worker_t *worker;
496
497 cd->channel.ch = ch;
498
499 /*
500 * Update stats for the worker.
501 */
503 worker->stats.out++;
504 worker->cpu_time = cd->reply.cpu_time;
505 if (!fr_time_delta_ispos(worker->predicted)) {
506 worker->predicted = cd->reply.processing_time;
507 } else {
508 worker->predicted = RTT(worker->predicted, cd->reply.processing_time);
509 }
510
511 /*
512 * Unblock the worker.
513 */
514 if (worker->blocked) {
515 worker->blocked = false;
516 nr->num_blocked--;
518 }
519
520 /*
521 * Ensure that heap insert works.
522 */
523 cd->channel.heap_id = FR_HEAP_INDEX_INVALID;
524 if (fr_heap_insert(&nr->replies, cd) < 0) {
525 fr_message_done(&cd->m);
526 fr_assert(0 == 1);
527 }
528}
529
530/** Handle a network control message callback for a channel
531 *
532 * This is called from the event loop when we get a notification
533 * from the event signalling pipe.
534 *
535 * @param[in] ctx the network
536 * @param[in] data the message
537 * @param[in] data_size size of the data
538 * @param[in] now the current time
539 */
540static void fr_network_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
541{
543 fr_channel_t *ch;
544 fr_network_t *nr = ctx;
545
546 ce = fr_channel_service_message(now, &ch, data, data_size);
547 DEBUG3("Channel %s",
548 fr_table_str_by_value(channel_signals, ce, "<INVALID>"));
549 switch (ce) {
550 case FR_CHANNEL_ERROR:
551 return;
552
553 case FR_CHANNEL_EMPTY:
554 return;
555
556 case FR_CHANNEL_NOOP:
557 break;
558
560 fr_assert(ch != NULL);
561 while (fr_channel_recv_reply(ch));
562 break;
563
565 fr_assert(0 == 1);
566 break;
567
568 case FR_CHANNEL_OPEN:
569 fr_assert(0 == 1);
570 break;
571
572 case FR_CHANNEL_CLOSE:
573 {
574 fr_network_worker_t *w = talloc_get_type_abort(fr_channel_requestor_uctx_get(ch),
576 int i;
577
578 /*
579 * Remove this worker from the array
580 */
581 for (i = 0; i < nr->num_workers; i++) {
582 DEBUG3("Worker acked our close request");
583 if (nr->workers[i] == w) {
584 nr->workers[i] = NULL;
585
586 if (i == (nr->num_workers - 1)) break;
587
588 /*
589 * Close the hole...
590 */
591 memcpy(&nr->workers[i], &nr->workers[i + 1], ((nr->num_workers - i) - 1));
592 break;
593 }
594 }
595 nr->num_workers--;
596 }
597 break;
598 }
599}
600
601#define OUTSTANDING(_x) ((_x)->stats.in - (_x)->stats.out)
602
603/** Send a message on the "best" channel.
604 *
605 * @param nr the network
606 * @param cd the message we've received
607 */
609{
610 fr_network_worker_t *worker;
611
612 (void) talloc_get_type_abort(nr, fr_network_t);
613
614retry:
615 if (nr->num_workers == 1) {
616 worker = nr->workers[0];
617 if (worker->blocked) {
618 RATE_LIMIT_GLOBAL(ERROR, "Failed sending packet to worker - "
619 "In single-threaded mode and worker is blocked");
620 drop:
621 worker->stats.dropped++;
622 return -1;
623 }
624
625 } else if (nr->num_blocked == 0) {
626 int64_t cmp;
627 uint32_t one, two;
628
629 one = fr_rand() % nr->num_workers;
630 do {
631 two = fr_rand() % nr->num_workers;
632 } while (two == one);
633
634 /*
635 * Choose a worker based on minimizing the amount
636 * of future work it's being asked to do.
637 *
638 * If both workers have the same number of
639 * outstanding requests, then choose the worker
640 * which has used the least total CPU time.
641 */
642 cmp = (OUTSTANDING(nr->workers[one]) - OUTSTANDING(nr->workers[two]));
643 if (cmp < 0) {
644 worker = nr->workers[one];
645
646 } else if (cmp > 0) {
647 worker = nr->workers[two];
648
649 } else if (fr_time_delta_lt(nr->workers[one]->cpu_time, nr->workers[two]->cpu_time)) {
650 worker = nr->workers[one];
651
652 } else {
653 worker = nr->workers[two];
654 }
655 } else {
656 int i;
657 uint64_t min_outstanding = UINT64_MAX;
658 fr_network_worker_t *found = NULL;
659
660 /*
661 * Some workers are blocked. Pick the worker
662 * with the least amount of future work to do.
663 */
664 for (i = 0; i < nr->num_workers; i++) {
665 uint64_t outstanding;
666
667 worker = nr->workers[i];
668 if (worker->blocked) continue;
669
670 outstanding = OUTSTANDING(worker);
671 if ((outstanding < min_outstanding) || !found) {
672 found = worker;
673 min_outstanding = outstanding;
674
675 } else if (outstanding == min_outstanding) {
676 /*
677 * Queue lengths are the same.
678 * Choose this worker if it's
679 * less busy than the previous one we found.
680 */
681 if (fr_time_delta_lt(worker->cpu_time, found->cpu_time)) {
682 found = worker;
683 }
684 }
685 }
686
687 if (!found) {
688 RATE_LIMIT_GLOBAL(PERROR, "Failed sending packet to worker - Couldn't find active worker, "
689 "%u/%u workers are blocked", nr->num_blocked, nr->num_workers);
690 return -1;
691 }
692
693 worker = found;
694 }
695
696 (void) talloc_get_type_abort(worker, fr_network_worker_t);
697
698 /*
699 * Too many outstanding packets for this worker. Drop
700 * the request.
701 *
702 * If the worker we've picked has too many outstanding
703 * packets, then we have either only one worker, in which
704 * cae we should drop the packet. Or, we were unable to
705 * find a worker with smaller than max_outstanding
706 * packets. In which case all of the workers are likely
707 * at max_outstanding.
708 *
709 * In both cases, we should just drop the new packet.
710 */
711 fr_assert(worker->stats.in >= worker->stats.out);
712 if (nr->config.max_outstanding &&
713 (OUTSTANDING(worker) >= nr->config.max_outstanding)) {
714 RATE_LIMIT_GLOBAL(PERROR, "max_outstanding reached - dropping packet");
715 goto drop;
716 }
717
718 /*
719 * Send the message to the channel. If we fail, drop the
720 * packet. The only reason for failure is that the
721 * worker isn't servicing it's input queue. When that
722 * happens, we have no idea what to do, and the whole
723 * thing falls over.
724 */
725 if (fr_channel_send_request(worker->channel, cd) < 0) {
726 worker->stats.dropped++;
727 worker->blocked = true;
728 nr->num_blocked++;
729
730 RATE_LIMIT_GLOBAL(PERROR, "Failed sending packet to worker - %u/%u workers are blocked",
731 nr->num_blocked, nr->num_workers);
732
733 if (nr->num_blocked == nr->num_workers) {
735 return -1;
736 }
737 goto retry;
738 }
739
740 worker->stats.in++;
741
742 /*
743 * We're projecting that the worker will use more CPU
744 * time to process this request. The CPU time will be
745 * updated with a more accurate number when we receive a
746 * reply from this channel.
747 */
748 worker->cpu_time = fr_time_delta_add(worker->cpu_time, worker->predicted);
749
750 return 0;
751}
752
753
754/** Send a packet to the worker.
755 *
756 * MUST only be called from the network thread.
757 *
758 * @param nr the network
759 * @param parent the parent listener
760 * @param li the listener that the packet was "read" from. Can be "parent"
761 * @param buffer the packet to send
762 * @param buflen size of the packet to send
763 * @param recv_time of the packet
764 * @param packet_ctx for the packet
765 * @return
766 * - <0 on error
767 * - 0 on success
768 */
770 const uint8_t *buffer, size_t buflen, fr_time_t recv_time, void *packet_ctx)
771{
774
775 (void) talloc_get_type_abort(nr, fr_network_t);
777
778 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
779 if (!s) return -1;
780
781 cd = (fr_channel_data_t *) fr_message_alloc(s->ms, NULL, buflen);
782 if (!cd) return -1;
783
784 cd->listen = parent;
786 cd->packet_ctx = packet_ctx;
787 cd->request.recv_time = recv_time;
788 memcpy(cd->m.data, buffer, buflen);
789 cd->m.when = fr_time();
790
791 if (fr_network_send_request(nr, cd) < 0) {
793 fr_message_done(&cd->m);
794 nr->stats.dropped++;
795 s->stats.dropped++;
796 return -1;
797 }
798
799 s->outstanding++;
800 return 0;
801}
802
803/** Get the number of outstanding packets
804 *
805 * @param nr the network
806 * @param li the listener that the packet was "read" from
807 * @return
808 * - <0 on error
809 * - the number of outstanding packets
810*/
813
814 (void) talloc_get_type_abort(nr, fr_network_t);
816
817 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
818 if (!s) return -1;
819
820 return s->outstanding;
821}
822
823/*
824 * Mark it as dead, but DON'T free it until all of the replies
825 * have come in.
826 */
828{
829 int i;
830
831 if (s->dead) return;
832
833 s->dead = true;
834
835 fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
836
837
838 for (i = 0; i < nr->max_workers; i++) {
839 if (!nr->workers[i]) continue;
840
841 (void) fr_worker_listen_cancel(nr->workers[i]->worker, s->listen);
842 }
843
844 /*
845 * If there are no outstanding packets, then we can free
846 * it now.
847 */
848 if (!s->outstanding) {
849 talloc_free(s);
850 return;
851 }
852
853 /*
854 * There are still outstanding packets. Leave it in the
855 * socket tree, so that replies from the worker can find
856 * it. When we've received all of the replies, then
857 * fr_network_post_event() will clean up this socket.
858 */
859}
860
861/** Read a packet from the network.
862 *
863 * @param[in] el the event list.
864 * @param[in] sockfd the socket which is ready to read.
865 * @param[in] flags from kevent.
866 * @param[in] ctx the network socket context.
867 */
868static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx)
869{
870 int num_messages = 0;
871 fr_network_socket_t *s = ctx;
872 fr_network_t *nr = s->nr;
873 ssize_t data_size;
874 fr_channel_data_t *cd, *next;
875
876 if (!fr_cond_assert_msg(s->listen->fd == sockfd, "Expected listen->fd (%u) to be equal event fd (%u)",
877 s->listen->fd, sockfd)) return;
878
879 DEBUG3("Reading data from FD %u", sockfd);
880
881 if (!s->cd) {
883 if (!cd) {
884 ERROR("Failed allocating message size %zd! - Closing socket",
887 return;
888 }
889 } else {
890 cd = s->cd;
891 }
892
893 fr_assert(cd->m.data != NULL);
894
895next_message:
896 /*
897 * Poll this socket, but not too often. We have to go
898 * service other sockets, too.
899 */
900 if (num_messages > 16) {
901 s->cd = cd;
902 return;
903 }
904
906
907 /*
908 * Read data from the network.
909 *
910 * Return of 0 means "no data", which is fine for UDP.
911 * For TCP, if an underlying read() on the TCP socket
912 * returns 0, (which signals that the FD is no longer
913 * usable) this function should return -1, so that the
914 * network side knows that it needs to close the
915 * connection.
916 */
917 data_size = s->listen->app_io->read(s->listen, &cd->packet_ctx, &cd->request.recv_time,
918 cd->m.data, cd->m.rb_size, &s->leftover);
919 if (data_size == 0) {
920 /*
921 * Cache the message for later. This is
922 * important for stream sockets, which can do
923 * partial reads into the current buffer. We
924 * need to be able to give the same buffer back
925 * to the stream socket for subsequent reads.
926 *
927 * Since we have a message set for each
928 * fr_io_socket_t, no "head of line"
929 * blocking issues can happen for stream sockets.
930 */
931 s->cd = cd;
932 return;
933 }
934
935 /*
936 * Error: close the connection, and remove the fr_listen_t
937 */
938 if (data_size < 0) {
939// fr_log(nr->log, L_DBG_ERR, "error from transport read on socket %d", sockfd);
941 return;
942 }
943 s->cd = NULL;
944
945 DEBUG3("Read %zd byte(s) from FD %u", data_size, sockfd);
946 nr->stats.in++;
947 s->stats.in++;
948
949 /*
950 * Initialize the rest of the fields of the channel data.
951 *
952 * We always use "now" as the time of the message, as the
953 * packet MAY be a duplicate packet magically resurrected
954 * from the past. i.e. If the read routines are doing
955 * dedup, then they notice that the packet is a
956 * duplicate. In that case, they send over a copy of the
957 * packet, BUT with the original timestamp. This
958 * information tells the worker that the packet is a
959 * duplicate.
960 */
961 cd->m.when = fr_time();
962 cd->listen = s->listen;
963
964 /*
965 * Nothing in the buffer yet. Allocate room for one
966 * packet.
967 */
968 if ((cd->m.data_size == 0) && (!s->leftover)) {
969
970 (void) fr_message_alloc(s->ms, &cd->m, data_size);
971 next = NULL;
972
973 } else {
974 /*
975 * There are leftover bytes in the buffer, feed
976 * them to the next round of reading.
977 */
978 next = (fr_channel_data_t *) fr_message_alloc_reserve(s->ms, &cd->m, data_size, s->leftover,
980 if (!next) {
981 PERROR("Failed reserving partial packet.");
982 // @todo - probably close the socket...
983 fr_assert(0 == 1);
984 }
985 }
986
987 /*
988 * Set the priority. Which incidentally also checks if
989 * we're allowed to read this particular kind of packet.
990 *
991 * That check is because the app_io handlers just read
992 * packets, and don't really have access to the parent
993 * "list of allowed packet types". So we have to do the
994 * work here in a callback.
995 *
996 * That should probably be fixed...
997 */
998 if (s->listen->app->priority) {
999 int priority;
1000
1001 priority = s->listen->app->priority(s->listen->app_instance, cd->m.data, data_size);
1002 if (priority <= 0) goto discard;
1003
1004 cd->priority = priority;
1005 }
1006
1007 if (fr_network_send_request(nr, cd) < 0) {
1008 discard:
1009 talloc_free(cd->packet_ctx); /* not sure what else to do here */
1010 fr_message_done(&cd->m);
1011 nr->stats.dropped++;
1012 s->stats.dropped++;
1013
1014 } else {
1015 /*
1016 * One more packet sent to a worker.
1017 */
1018 s->outstanding++;
1019 }
1020
1021 /*
1022 * If there is a next message, go read it from the buffer.
1023 *
1024 * @todo - note that this calls read(), even if the
1025 * app_io has paused the reader. We likely want to be
1026 * able to check that, too. We might just remove this
1027 * "goto"...
1028 */
1029 if (next) {
1030 cd = next;
1031 num_messages++;
1032 goto next_message;
1033 }
1034}
1035
1036int fr_network_sendto_worker(fr_network_t *nr, fr_listen_t *li, void *packet_ctx, uint8_t const *data, size_t data_len, fr_time_t recv_time)
1037{
1040
1041 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
1042 if (!s) return -1;
1043
1044 cd = (fr_channel_data_t *) fr_message_alloc(s->ms, NULL, data_len);
1045 if (!cd) return -1;
1046
1047 s->stats.in++;
1048
1050
1051 cd->m.when = recv_time;
1052 cd->listen = li;
1053 cd->packet_ctx = packet_ctx;
1054
1055 memcpy(cd->m.data, data, data_len);
1056
1057 if (fr_network_send_request(nr, cd) < 0) {
1058 talloc_free(packet_ctx);
1059 fr_message_done(&cd->m);
1060 nr->stats.dropped++;
1061 s->stats.dropped++;
1062
1063 } else {
1064 /*
1065 * One more packet sent to a worker.
1066 */
1067 s->outstanding++;
1068 }
1069
1070 return 0;
1071}
1072
1073
1074/** Get a notification that a vnode changed
1075 *
1076 * @param[in] el the event list.
1077 * @param[in] sockfd the socket which is ready to read.
1078 * @param[in] fflags from kevent.
1079 * @param[in] ctx the network socket context.
1080 */
1081static void fr_network_vnode_extend(UNUSED fr_event_list_t *el, int sockfd, int fflags, void *ctx)
1082{
1083 fr_network_socket_t *s = ctx;
1084 fr_network_t *nr = s->nr;
1085
1087
1088 DEBUG3("network vnode");
1089
1090 /*
1091 * Tell the IO handler that something has happened to the
1092 * file.
1093 */
1094 s->listen->app_io->vnode(s->listen, fflags);
1095}
1096
1097
1098/** Handle errors for a socket.
1099 *
1100 * @param[in] el the event list
1101 * @param[in] sockfd the socket which has a fatal error.
1102 * @param[in] flags returned by kevent.
1103 * @param[in] fd_errno returned by kevent.
1104 * @param[in] ctx the network socket context.
1105 */
1107 int fd_errno, void *ctx)
1108{
1109 fr_network_socket_t *s = ctx;
1110 fr_network_t *nr = s->nr;
1111
1112 if (s->listen->app_io->error) {
1113 s->listen->app_io->error(s->listen);
1114
1115 } else if (flags & EV_EOF) {
1116 DEBUG2("Socket %s closed by peer", s->listen->name);
1117
1118 } else {
1119 ERROR("Socket %s errored - %s", s->listen->name, fr_syserror(fd_errno));
1120 }
1121
1123}
1124
1125
1128 { 0 }
1129};
1130
1133 { 0 }
1134};
1135
1136
1137/** Write packets to the network.
1138 *
1139 * @param el the event list
1140 * @param sockfd the socket which is ready to write
1141 * @param flags returned by kevent.
1142 * @param ctx the network socket context.
1143 */
1144static void fr_network_write(UNUSED fr_event_list_t *el, UNUSED int sockfd, UNUSED int flags, void *ctx)
1145{
1146 fr_network_socket_t *s = ctx;
1147 fr_listen_t *li = s->listen;
1148 fr_network_t *nr = s->nr;
1150
1151 (void) talloc_get_type_abort(nr, fr_network_t);
1152
1153 /*
1154 * Start with the currently pending message, and then
1155 * work through the priority heap.
1156 */
1157 if (s->pending) {
1158 cd = s->pending;
1159 s->pending = NULL;
1160
1161 } else {
1162 cd = fr_heap_pop(&s->waiting);
1163 }
1164
1165 while (cd != NULL) {
1166 int rcode;
1167
1168 fr_assert(li == cd->listen);
1169 rcode = li->app_io->write(li, cd->packet_ctx,
1170 cd->reply.request_time,
1171 cd->m.data, cd->m.data_size, s->written);
1172
1173 /*
1174 * As a special case, allow write() to return
1175 * "0", which means "close the socket".
1176 */
1177 if (rcode == 0) goto dead;
1178
1179 /*
1180 * Or we have a write error.
1181 */
1182 if (rcode < 0) {
1183 /*
1184 * Stop processing the heap, and set the
1185 * pending message to the current one.
1186 */
1187 if (errno == EWOULDBLOCK) {
1188 save_pending:
1189 fr_assert(!s->pending);
1190
1191 if (cd->m.status != FR_MESSAGE_LOCALIZED) {
1192 fr_message_t *lm;
1193
1194 lm = fr_message_localize(s, &cd->m, sizeof(*cd));
1195 if (!lm) {
1196 ERROR("Failed saving pending packet");
1197 goto dead;
1198 }
1199
1200 cd = (fr_channel_data_t *) lm;
1201 }
1202
1203 if (!s->blocked) {
1205 PERROR("Failed adding write callback to event loop");
1206 goto dead;
1207 }
1208
1209 s->blocked = true;
1210 }
1211
1212 s->pending = cd;
1213 return;
1214 }
1215
1216 /*
1217 * As a special hack, check for something
1218 * that will never be returned from a
1219 * real write() routine. Which then
1220 * signals to us that we have to close
1221 * the socket, but NOT complain about it.
1222 */
1223 if (errno == ECONNREFUSED) goto dead;
1224
1225 PERROR("Failed writing to socket %s", s->listen->name);
1226 if (li->app_io->error) li->app_io->error(li);
1227
1228 dead:
1229 fr_message_done(&cd->m);
1231 return;
1232 }
1233
1234 /*
1235 * If we've done a partial write, localize the message and continue.
1236 */
1237 if ((size_t) rcode < cd->m.data_size) {
1238 s->written = rcode;
1239 goto save_pending;
1240 }
1241
1242 s->written = 0;
1243
1244 /*
1245 * Reset for the next message.
1246 */
1247 fr_message_done(&cd->m);
1248 nr->stats.out++;
1249 s->stats.out++;
1250
1251 /*
1252 * Grab the net entry.
1253 */
1254 cd = fr_heap_pop(&s->waiting);
1255 }
1256
1257 /*
1258 * We've successfully written all of the packets. Remove
1259 * the write callback.
1260 */
1262 PERROR("Failed removing write callback from event loop");
1264 }
1265
1266 s->blocked = false;
1267}
1268
1270{
1271 fr_network_t *nr = s->nr;
1273
1274 fr_assert(s->outstanding == 0);
1275
1276 fr_rb_delete(nr->sockets, s);
1278
1279 fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
1280
1281 if (s->listen->app_io->close) {
1282 s->listen->app_io->close(s->listen);
1283 } else {
1284 close(s->listen->fd);
1285 }
1286
1287 if (s->pending) {
1289 s->pending = NULL;
1290 }
1291
1292 /*
1293 * Clean up any queued entries.
1294 */
1295 while ((cd = fr_heap_pop(&s->waiting)) != NULL) {
1296 fr_message_done(&cd->m);
1297 }
1298
1299 talloc_free(s->waiting);
1300 talloc_free(s->listen);
1301
1302 return 0;
1303}
1304
1305
1306/** Handle a network control message callback for a new listener
1307 *
1308 * @param[in] ctx the network
1309 * @param[in] data the message
1310 * @param[in] data_size size of the data
1311 * @param[in] now the current time
1312 */
1313static void fr_network_listen_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1314{
1315 fr_network_t *nr = talloc_get_type_abort(ctx, fr_network_t);
1316 fr_listen_t *li;
1317
1318 fr_assert(data_size == sizeof(li));
1319
1320 if (data_size != sizeof(li)) return;
1321
1322 li = talloc_get_type_abort(*((void * const *)data), fr_listen_t);
1323
1324 (void) fr_network_listen_add_self(nr, li);
1325}
1326
1328{
1330 fr_app_io_t const *app_io;
1331 size_t size;
1332 int num_messages;
1333
1334 fr_assert(li->app_io != NULL);
1335
1336 /*
1337 * Non-socket listeners just get told about the event
1338 * list, and nothing else.
1339 */
1340 if (li->non_socket_listener) {
1341 fr_assert(li->app_io->event_list_set != NULL);
1342 fr_assert(!li->app_io->read);
1343 fr_assert(!li->app_io->write);
1344
1345 li->app_io->event_list_set(li, nr->el, nr);
1346
1347 /*
1348 * We use fr_log() here to avoid the "Network - " prefix.
1349 */
1350 fr_log(nr->log, L_DBG, __FILE__, __LINE__, "Listener %s bound to virtual server %s",
1351 li->name, cf_section_name2(li->server_cs));
1352
1353 return 0;
1354 }
1355
1356 s = talloc_zero(nr, fr_network_socket_t);
1357 fr_assert(s != NULL);
1358 talloc_steal(s, li);
1359
1360 s->nr = nr;
1361 s->listen = li;
1362 s->number = nr->num_sockets++;
1363
1364 MEM(s->waiting = fr_heap_alloc(s, waiting_cmp, fr_channel_data_t, channel.heap_id, 0));
1365
1366 talloc_set_destructor(s, _network_socket_free);
1367
1368 /*
1369 * Put reasonable limits on the ring buffer size. Then
1370 * round it up to the nearest power of 2, which is
1371 * required by the ring buffer code.
1372 */
1373 num_messages = s->listen->num_messages;
1374 if (num_messages < 8) num_messages = 8;
1375
1376 size = s->listen->default_message_size * num_messages;
1377 if (size < (1 << 17)) size = (1 << 17);
1378 if (size > (100 * 1024 * 1024)) size = (100 * 1024 * 1024);
1379
1380 /*
1381 * Allocate the ring buffer for messages and packets.
1382 */
1383 s->ms = fr_message_set_create(s, num_messages,
1384 sizeof(fr_channel_data_t),
1385 size);
1386 if (!s->ms) {
1387 PERROR("Failed creating message buffers for network IO");
1388 talloc_free(s);
1389 return -1;
1390 }
1391
1392 app_io = s->listen->app_io;
1394
1395 if (fr_event_fd_insert(nr, NULL, nr->el, s->listen->fd,
1399 s) < 0) {
1400 PERROR("Failed adding new socket to network event loop");
1401 talloc_free(s);
1402 return -1;
1403 }
1404
1405 /*
1406 * Start of with write updates being paused. We don't
1407 * care about being able to write if there's nothing to
1408 * write.
1409 */
1411
1412 /*
1413 * Add the listener before calling the app_io, so that
1414 * the app_io can find the listener which we're adding
1415 * here.
1416 */
1417 (void) fr_rb_insert(nr->sockets, s);
1418 (void) fr_rb_insert(nr->sockets_by_num, s);
1419
1420 if (app_io->event_list_set) app_io->event_list_set(s->listen, nr->el, nr);
1421
1422 /*
1423 * We use fr_log() here to avoid the "Network - " prefix.
1424 */
1425 fr_log(nr->log, L_DBG, __FILE__, __LINE__, "Listening on %s bound to virtual server %s",
1427
1428 DEBUG3("Using new socket %s with FD %d", s->listen->name, s->listen->fd);
1429
1430 return 0;
1431}
1432
1433/** Handle a network control message callback for a new "watch directory"
1434 *
1435 * @param[in] ctx the network
1436 * @param[in] data the message
1437 * @param[in] data_size size of the data
1438 * @param[in] now the current time
1439 */
1440static void fr_network_directory_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1441{
1442 int num_messages;
1443 fr_network_t *nr = talloc_get_type_abort(ctx, fr_network_t);
1444 fr_listen_t *li = talloc_get_type_abort(*((void * const *)data), fr_listen_t);
1446 fr_app_io_t const *app_io;
1448
1449 if (fr_cond_assert(data_size == sizeof(li))) return;
1450
1451 memcpy(&li, data, sizeof(li));
1452
1453 s = talloc_zero(nr, fr_network_socket_t);
1454 fr_assert(s != NULL);
1455 talloc_steal(s, li);
1456
1457 s->nr = nr;
1458 s->listen = li;
1459 s->number = nr->num_sockets++;
1460
1461 MEM(s->waiting = fr_heap_alloc(s, waiting_cmp, fr_channel_data_t, channel.heap_id, 0));
1462
1463 talloc_set_destructor(s, _network_socket_free);
1464
1465 /*
1466 * Allocate the ring buffer for messages and packets.
1467 */
1468 num_messages = s->listen->num_messages;
1469 if (num_messages < 8) num_messages = 8;
1470
1471 s->ms = fr_message_set_create(s, num_messages,
1472 sizeof(fr_channel_data_t),
1474 if (!s->ms) {
1475 PERROR("Failed creating message buffers for directory IO");
1476 talloc_free(s);
1477 return;
1478 }
1479
1480 app_io = s->listen->app_io;
1481
1482 if (app_io->event_list_set) app_io->event_list_set(s->listen, nr->el, nr);
1483
1485
1486 if (fr_event_filter_insert(nr, NULL, nr->el, s->listen->fd, s->filter,
1487 &funcs,
1488 app_io->error ? fr_network_error : NULL,
1489 s) < 0) {
1490 PERROR("Failed adding directory monitor event loop");
1491 talloc_free(s);
1492 return;
1493 }
1494
1495 (void) fr_rb_insert(nr->sockets, s);
1496 (void) fr_rb_insert(nr->sockets_by_num, s);
1497
1498 DEBUG3("Using new socket with FD %d", s->listen->fd);
1499}
1500
1501/** Handle a network control message callback for a new worker
1502 *
1503 * @param[in] ctx the network
1504 * @param[in] data the message
1505 * @param[in] data_size size of the data
1506 * @param[in] now the current time
1507 */
1508static void fr_network_worker_started_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1509{
1510 int i;
1511 fr_network_t *nr = ctx;
1512 fr_worker_t *worker;
1514
1515 fr_assert(data_size == sizeof(worker));
1516
1517 memcpy(&worker, data, data_size);
1518 (void) talloc_get_type_abort(worker, fr_worker_t);
1519
1520 MEM(w = talloc_zero(nr, fr_network_worker_t));
1521
1522 w->worker = worker;
1523 w->channel = fr_worker_channel_create(worker, w, nr->control);
1525 fr_fatal_assert_msg(w->channel, "Failed creating new channel");
1526
1529
1530 /*
1531 * FIXME: This creates a race in the network loop
1532 * exit condition, because it can theoretically
1533 * be signalled to exit before the workers have
1534 * ACKd channel creation.
1535 */
1536 nr->num_workers++;
1537
1538 /*
1539 * Insert the worker into the array of workers.
1540 */
1541 for (i = 0; i < nr->max_workers; i++) {
1542 if (nr->workers[i]) continue;
1543
1544 nr->workers[i] = w;
1545 return;
1546 }
1547
1548 /*
1549 * Run out of room to put workers!
1550 */
1551 fr_assert(0 == 1);
1552}
1553
1554/** Handle a network control message callback for a packet sent to a socket
1555 *
1556 * @param[in] ctx the network
1557 * @param[in] data the message
1558 * @param[in] data_size size of the data
1559 * @param[in] now the current time
1560 */
1561static void fr_network_inject_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1562{
1563 fr_network_t *nr = ctx;
1564 fr_network_inject_t my_inject;
1566
1567 fr_assert(data_size == sizeof(my_inject));
1568
1569 memcpy(&my_inject, data, data_size);
1570 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = my_inject.listen });
1571 if (!s) {
1572 talloc_free(my_inject.packet); /* MUST be it's own TALLOC_CTX */
1573 return;
1574 }
1575
1576 /*
1577 * Inject the packet, and then read it back from the
1578 * network.
1579 */
1580 if (s->listen->app_io->inject(s->listen, my_inject.packet, my_inject.packet_len, my_inject.recv_time) == 0) {
1581 fr_network_read(nr->el, s->listen->fd, 0, s);
1582 }
1583
1584 talloc_free(my_inject.packet);
1585}
1586
1587/** Run the event loop 'pre' callback
1588 *
1589 * This function MUST DO NO WORK. All it does is check if there's
1590 * work, and tell the event code to return to the main loop if
1591 * there's work to do.
1592 *
1593 * @param[in] now the current time.
1594 * @param[in] wake the time when the event loop will wake up.
1595 * @param[in] uctx the network
1596 */
1598{
1599 fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1600
1601 if (fr_heap_num_elements(nr->replies) > 0) return 1;
1602
1603 return 0;
1604}
1605
1606/** Handle replies after all FD and timer events have been serviced
1607 *
1608 * @param el the event loop
1609 * @param now the current time (mostly)
1610 * @param uctx the fr_network_t
1611 */
1613{
1615 fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1616
1617 /*
1618 * Pull the replies off of our global heap, and try to
1619 * push them to the individual sockets.
1620 */
1621 while ((cd = fr_heap_pop(&nr->replies)) != NULL) {
1622 fr_listen_t *li;
1624
1625 li = cd->listen;
1626
1627 /*
1628 * @todo - cache this somewhere so we don't need
1629 * to do an rbtree lookup for every packet.
1630 */
1631 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
1632
1633 /*
1634 * This shouldn't happen, but be safe...
1635 */
1636 if (!s) {
1637 fr_message_done(&cd->m);
1638 continue;
1639 }
1640
1641 if (cd->m.status != FR_MESSAGE_LOCALIZED) {
1642 fr_assert(s->outstanding > 0);
1643 s->outstanding--;
1644 }
1645
1646 /*
1647 * Just mark the message done, and skip it.
1648 */
1649 if (s->dead) {
1650 fr_message_done(&cd->m);
1651
1652 /*
1653 * No more packets, it's safe to delete
1654 * the socket.
1655 */
1656 if (!s->outstanding) talloc_free(s);
1657
1658 continue;
1659 }
1660
1661 /*
1662 * No data to write to the socket, so we skip the message.
1663 */
1664 if (!cd->m.data_size) {
1665 fr_message_done(&cd->m);
1666 continue;
1667 }
1668
1669 /*
1670 * No pending message, let's try writing it.
1671 *
1672 * If there is a pending message, then we're
1673 * waiting for IO write to become ready.
1674 */
1675 if (!s->pending) {
1676 fr_assert(!s->blocked);
1677 (void) fr_heap_insert(&s->waiting, cd);
1678 fr_network_write(nr->el, s->listen->fd, 0, s);
1679 }
1680 }
1681}
1682
1683/** Stop a network thread in an orderly way
1684 *
1685 * @param[in] nr the network to stop
1686 */
1688{
1690
1691 (void) talloc_get_type_abort(nr, fr_network_t);
1692
1693 /*
1694 * Close the network sockets
1695 */
1696 {
1697 fr_network_socket_t **sockets;
1698 size_t len;
1699 size_t i;
1700
1701 if (fr_rb_flatten_inorder(nr, (void ***)&sockets, nr->sockets) < 0) return -1;
1702 len = talloc_array_length(sockets);
1703
1704 for (i = 0; i < len; i++) {
1705 /*
1706 * Force to zero so we don't trigger asserts
1707 * if packets are being processed and the
1708 * server exits.
1709 */
1710 sockets[i]->outstanding = 0;
1711 talloc_free(sockets[i]);
1712 }
1713
1714 talloc_free(sockets);
1715 }
1716
1717
1718 /*
1719 * Clean up all outstanding replies.
1720 *
1721 * We can't do this after signalling the
1722 * workers to close, because they free
1723 * their message sets, and we end up
1724 * getting random use-after-free errors
1725 * as there's a race between the network
1726 * popping replies, and the workers
1727 * freeing their message sets.
1728 *
1729 * This isn't perfect, and we might still
1730 * lose some replies, but it's good enough
1731 * for now.
1732 *
1733 * @todo - call transport "done" for the reply, so that
1734 * it knows the replies are done, too.
1735 */
1736 while ((cd = fr_heap_pop(&nr->replies)) != NULL) {
1737 fr_message_done(&cd->m);
1738 }
1739
1740 /*
1741 * Signal the workers that we're closing
1742 *
1743 * nr->num_workers is decremented every
1744 * time a worker closes a socket.
1745 *
1746 * When nr->num_workers == 0, the event
1747 * loop (fr_network()) will exit.
1748 */
1749 {
1750 int i;
1751
1752 for (i = 0; i < nr->num_workers; i++) {
1753 fr_network_worker_t *worker = nr->workers[i];
1754
1756 }
1757 }
1758
1761 nr->exiting = true;
1763
1764 return 0;
1765}
1766
1767/** Read handler for signal pipe
1768 *
1769 */
1770static void _signal_pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
1771{
1772 fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1773 uint8_t buff;
1774
1775 if (read(fd, &buff, sizeof(buff)) < 0) {
1776 ERROR("Failed reading signal - %s", fr_syserror(errno));
1777 return;
1778 }
1779
1780 fr_assert(buff == 1);
1781
1782 /*
1783 * fr_network_stop() will signal the workers
1784 * to exit (by closing their channels).
1785 *
1786 * When we get the ack, we decrement our
1787 * nr->num_workers counter.
1788 *
1789 * When the counter reaches 0, the event loop
1790 * exits.
1791 */
1792 DEBUG2("Signalled to exit");
1793
1794 if (unlikely(fr_network_destroy(nr) < 0)) {
1795 PERROR("Failed stopping network");
1796 }
1797}
1798
1799/** The main network worker function.
1800 *
1801 * @param[in] nr the network data structure to run.
1802 */
1804{
1805 /*
1806 * Run until we're told to exit AND the number of
1807 * workers has dropped to zero.
1808 *
1809 * This is important as if we exit too early we
1810 * free the channels out from underneath the
1811 * workers and they read uninitialised memory.
1812 *
1813 * Whenever a worker ACKs our close notification
1814 * nr->num_workers is decremented, so when
1815 * nr->num_workers == 0, all workers have ACKd
1816 * our close and are no longer using the channel.
1817 */
1818 while (likely(!(nr->exiting && (nr->num_workers == 0)))) {
1819 bool wait_for_event;
1820 int num_events;
1821
1822 /*
1823 * There are runnable requests. We still service
1824 * the event loop, but we don't wait for events.
1825 */
1826 wait_for_event = (fr_heap_num_elements(nr->replies) == 0);
1827
1828 /*
1829 * Check the event list. If there's an error
1830 * (e.g. exit), we stop looping and clean up.
1831 */
1832 DEBUG4("Gathering events - %s", wait_for_event ? "will wait" : "Will not wait");
1833 num_events = fr_event_corral(nr->el, fr_time(), wait_for_event);
1834 DEBUG4("%u event(s) pending%s",
1835 num_events == -1 ? 0 : num_events, num_events == -1 ? " - event loop exiting" : "");
1836 if (num_events < 0) break;
1837
1838 /*
1839 * Service outstanding events.
1840 */
1841 if (num_events > 0) {
1842 DEBUG4("Servicing event(s)");
1843 fr_event_service(nr->el);
1844 }
1845 }
1846 return;
1847}
1848
1849/** Signal a network thread to exit
1850 *
1851 * @note Request to exit will be processed asynchronously.
1852 *
1853 * @param[in] nr the network data structure to manage
1854 * @return
1855 * - 0 on success.
1856 * - -1 on failure.
1857 */
1859{
1860 if (write(nr->signal_pipe[1], &(uint8_t){ 0x01 }, 1) < 0) {
1861 fr_strerror_printf("Failed signalling network thread to exit - %s", fr_syserror(errno));
1862 return -1;
1863 }
1864
1865 return 0;
1866}
1867
1868/** Free any resources associated with a network thread
1869 *
1870 */
1872{
1873 if (nr->signal_pipe[0] >= 0) close(nr->signal_pipe[0]);
1874 if (nr->signal_pipe[1] >= 0) close(nr->signal_pipe[1]);
1875
1876 return 0;
1877}
1878
1879/** Create a network
1880 *
1881 * @param[in] ctx The talloc ctx
1882 * @param[in] el The event list
1883 * @param[in] name Networker identifier.
1884 * @param[in] logger The destination for all logging messages
1885 * @param[in] lvl Log level
1886 * @param[in] config configuration structure.
1887 * @return
1888 * - NULL on error
1889 * - fr_network_t on success
1890 */
1891fr_network_t *fr_network_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name,
1892 fr_log_t const *logger, fr_log_lvl_t lvl,
1894{
1895 fr_network_t *nr;
1896
1897 nr = talloc_zero(ctx, fr_network_t);
1898 if (!nr) {
1899 fr_strerror_const("Failed allocating memory");
1900 return NULL;
1901 }
1902 talloc_set_destructor(nr, _fr_network_free);
1903
1904 nr->name = talloc_strdup(nr, name);
1905
1906 nr->thread_id = pthread_self();
1907 nr->el = el;
1908 nr->log = logger;
1909 nr->lvl = lvl;
1910
1912 nr->num_workers = 0;
1913 nr->signal_pipe[0] = -1;
1914 nr->signal_pipe[1] = -1;
1915 if (config) nr->config = *config;
1916
1917 nr->aq_control = fr_atomic_queue_alloc(nr, 1024);
1918 if (!nr->aq_control) {
1919 talloc_free(nr);
1920 return NULL;
1921 }
1922
1923 nr->control = fr_control_create(nr, el, nr->aq_control);
1924 if (!nr->control) {
1925 fr_strerror_const_push("Failed creating control queue");
1926 fail:
1927 talloc_free(nr);
1928 return NULL;
1929 }
1930
1931 /*
1932 * @todo - rely on thread-local variables. And then the
1933 * various users of this can check if (rb == nr->rb), and
1934 * if so, skip the whole control plane / kevent /
1935 * whatever roundabout thing.
1936 */
1938 if (!nr->rb) {
1939 fr_strerror_const_push("Failed creating ring buffer");
1940 fail2:
1941 talloc_free(nr->control);
1942 goto fail;
1943 }
1944
1946 fr_strerror_const_push("Failed adding channel callback");
1947 goto fail2;
1948 }
1949
1951 fr_strerror_const_push("Failed adding socket callback");
1952 goto fail2;
1953 }
1954
1956 fr_strerror_const_push("Failed adding socket callback");
1957 goto fail2;
1958 }
1959
1961 fr_strerror_const_push("Failed adding worker callback");
1962 goto fail2;
1963 }
1964
1966 fr_strerror_const_push("Failed adding packet injection callback");
1967 goto fail2;
1968 }
1969
1970 /*
1971 * Create the various heaps.
1972 */
1974 if (!nr->sockets) {
1975 fr_strerror_const_push("Failed creating listen tree for sockets");
1976 goto fail2;
1977 }
1978
1980 if (!nr->sockets_by_num) {
1981 fr_strerror_const_push("Failed creating number tree for sockets");
1982 goto fail2;
1983 }
1984
1985 nr->replies = fr_heap_alloc(nr, reply_cmp, fr_channel_data_t, channel.heap_id, 0);
1986 if (!nr->replies) {
1987 fr_strerror_const_push("Failed creating heap for replies");
1988 goto fail2;
1989 }
1990
1991 if (fr_event_pre_insert(nr->el, fr_network_pre_event, nr) < 0) {
1992 fr_strerror_const("Failed adding pre-check to event list");
1993 goto fail2;
1994 }
1995
1996 if (fr_event_post_insert(nr->el, fr_network_post_event, nr) < 0) {
1997 fr_strerror_const("Failed inserting post-processing event");
1998 goto fail2;
1999 }
2000
2001 if (pipe(nr->signal_pipe) < 0) {
2002 fr_strerror_printf("Failed initialising signal pipe - %s", fr_syserror(errno));
2003 goto fail2;
2004 }
2005 if (fr_nonblock(nr->signal_pipe[0]) < 0) goto fail2;
2006 if (fr_nonblock(nr->signal_pipe[1]) < 0) goto fail2;
2007
2008 if (fr_event_fd_insert(nr, NULL, nr->el, nr->signal_pipe[0], _signal_pipe_read, NULL, NULL, nr) < 0) {
2009 fr_strerror_const("Failed inserting event for signal pipe");
2010 goto fail2;
2011 }
2012
2013 return nr;
2014}
2015
2016int fr_network_stats(fr_network_t const *nr, int num, uint64_t *stats)
2017{
2018 if (num < 0) return -1;
2019 if (num == 0) return 0;
2020
2021 stats[0] = nr->stats.in;
2022 if (num >= 2) stats[1] = nr->stats.out;
2023 if (num >= 3) stats[2] = nr->stats.dup;
2024 if (num >= 4) stats[3] = nr->stats.dropped;
2025 if (num >= 5) stats[4] = nr->num_workers;
2026
2027 if (num <= 5) return num;
2028
2029 return 5;
2030}
2031
2032void fr_network_stats_log(fr_network_t const *nr, fr_log_t const *log)
2033{
2034 int i;
2035
2036 /*
2037 * Dump all of the channel statistics.
2038 */
2039 for (i = 0; i < nr->max_workers; i++) {
2040 if (!nr->workers[i]) continue;
2041
2042 fr_channel_stats_log(nr->workers[i]->channel, log, __FILE__, __LINE__);
2043 }
2044}
2045
2046static int cmd_stats_self(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
2047{
2048 fr_network_t const *nr = ctx;
2049
2050 fprintf(fp, "count.in\t%" PRIu64 "\n", nr->stats.in);
2051 fprintf(fp, "count.out\t%" PRIu64 "\n", nr->stats.out);
2052 fprintf(fp, "count.dup\t%" PRIu64 "\n", nr->stats.dup);
2053 fprintf(fp, "count.dropped\t%" PRIu64 "\n", nr->stats.dropped);
2054 fprintf(fp, "count.sockets\t%u\n", fr_rb_num_elements(nr->sockets));
2055
2056 return 0;
2057}
2058
2059static int cmd_socket_list(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
2060{
2061 fr_network_t const *nr = ctx;
2064
2065 // @todo - note that this isn't thread-safe!
2066
2067 for (s = fr_rb_iter_init_inorder(&iter, nr->sockets);
2068 s != NULL;
2069 s = fr_rb_iter_next_inorder(&iter)) {
2070 if (!s->listen->app_io->get_name) {
2071 fprintf(fp, "%s\n", s->listen->app_io->common.name);
2072 } else {
2073 fprintf(fp, "%d\t%s\n", s->number, s->listen->app_io->get_name(s->listen));
2074 }
2075 }
2076 return 0;
2077}
2078
2079static int cmd_stats_socket(FILE *fp, FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
2080{
2081 fr_network_t const *nr = ctx;
2083
2084 s = fr_rb_find(nr->sockets_by_num, &(fr_network_socket_t){ .number = info->box[0]->vb_uint32 });
2085 if (!s) {
2086 fprintf(fp_err, "No such socket number '%s'.\n", info->argv[0]);
2087 return -1;
2088 }
2089
2090 fprintf(fp, "count.in\t%" PRIu64 "\n", s->stats.in);
2091 fprintf(fp, "count.out\t%" PRIu64 "\n", s->stats.out);
2092 fprintf(fp, "count.dup\t%" PRIu64 "\n", s->stats.dup);
2093 fprintf(fp, "count.dropped\t%" PRIu64 "\n", s->stats.dropped);
2094
2095 return 0;
2096}
2097
2098
2100 {
2101 .parent = "stats",
2102 .name = "network",
2103 .help = "Statistics for network threads.",
2104 .read_only = true
2105 },
2106
2107 {
2108 .parent = "stats network",
2109 .add_name = true,
2110 .name = "self",
2111 .func = cmd_stats_self,
2112 .help = "Show statistics for a specific network thread.",
2113 .read_only = true
2114 },
2115
2116 {
2117 .parent = "stats network",
2118 .add_name = true,
2119 .name = "socket",
2120 .syntax = "INTEGER",
2121 .func = cmd_stats_socket,
2122 .help = "Show statistics for a specific socket",
2123 .read_only = true
2124 },
2125
2126 {
2127 .parent = "show",
2128 .name = "network",
2129 .help = "Show information about network threads.",
2130 .read_only = true
2131 },
2132
2133 {
2134 .parent = "show network",
2135 .add_name = true,
2136 .name = "socket",
2137 .syntax = "list",
2138 .func = cmd_socket_list,
2139 .help = "List the sockets associated with this network thread.",
2140 .read_only = true
2141 },
2142
2144};
static int const char char buffer[256]
Definition acutest.h:576
fr_io_close_t close
Close the transport.
Definition app_io.h:60
fr_io_data_read_t read
Read from a socket to a data buffer.
Definition app_io.h:47
module_t common
Common fields to all loadable modules.
Definition app_io.h:34
fr_io_signal_t error
There was an error on the socket.
Definition app_io.h:59
fr_app_event_list_set_t event_list_set
Called by the network thread to pass an event list for use by the app_io_t.
Definition app_io.h:36
fr_io_data_inject_t inject
Inject a packet into a socket.
Definition app_io.h:50
fr_io_data_vnode_t vnode
Handle notifications that the VNODE has changed.
Definition app_io.h:52
fr_io_data_write_t write
Write from a data buffer to a socket.
Definition app_io.h:48
fr_io_name_t get_name
get the socket name
Definition app_io.h:70
Public structure describing an I/O path for a protocol.
Definition app_io.h:33
fr_app_priority_get_t priority
Assign a priority to the packet.
Definition application.h:90
#define fr_atexit_thread_local(_name, _free, _uctx)
Definition atexit.h:221
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:485
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:112
#define unlikely(_x)
Definition build.h:383
#define UNUSED
Definition build.h:317
char const * cf_section_name2(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
Definition cf_util.c:1185
void * fr_channel_requestor_uctx_get(fr_channel_t *ch)
Get network-specific data from a channel.
Definition channel.c:922
fr_table_num_sorted_t const channel_signals[]
Definition channel.c:153
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
Definition channel.c:408
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
Definition channel.c:824
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
Definition channel.c:306
int fr_channel_set_recv_reply(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_reply)
Definition channel.c:930
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition channel.c:685
void fr_channel_requestor_uctx_add(fr_channel_t *ch, void *uctx)
Add network-specific data to a channel.
Definition channel.c:910
void fr_channel_stats_log(fr_channel_t const *ch, fr_log_t const *log, char const *file, int line)
Definition channel.c:963
A full channel, which consists of two ends.
Definition channel.c:144
fr_message_t m
the message header
Definition channel.h:105
fr_channel_event_t
Definition channel.h:67
@ FR_CHANNEL_NOOP
Definition channel.h:74
@ FR_CHANNEL_EMPTY
Definition channel.h:75
@ FR_CHANNEL_CLOSE
Definition channel.h:72
@ FR_CHANNEL_ERROR
Definition channel.h:68
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition channel.h:70
@ FR_CHANNEL_OPEN
Definition channel.h:71
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition channel.h:69
void * packet_ctx
Packet specific context for holding client information, and other proto_* specific information that n...
Definition channel.h:142
fr_listen_t * listen
for tracking packet transport, etc.
Definition channel.h:146
#define PRIORITY_NORMAL
Definition channel.h:151
#define PRIORITY_NOW
Definition channel.h:149
uint32_t priority
Priority of this packet.
Definition channel.h:140
Channel information which is added to a message.
Definition channel.h:104
char const * parent
e.g. "show module"
Definition command.h:52
#define CMD_TABLE_END
Definition command.h:62
char const ** argv
text version of commands
Definition command.h:42
#define FR_CONTROL_ID_INJECT
Definition control.h:60
#define FR_CONTROL_ID_DIRECTORY
Definition control.h:59
#define FR_CONTROL_ID_CHANNEL
Definition control.h:56
#define FR_CONTROL_ID_LISTEN
Definition control.h:57
#define FR_CONTROL_ID_WORKER
Definition control.h:58
#define FR_CONTROL_MAX_SIZE
Definition control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition control.h:50
static fr_ring_buffer_t * rb
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:139
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:156
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:184
#define MEM(x)
Definition debug.h:36
#define ERROR(fmt,...)
Definition dhcpclient.c:41
static int sockfd
Definition dhcpclient.c:56
#define fr_event_fd_insert(...)
Definition event.h:248
fr_event_filter_t
The type of filter to install for an FD.
Definition event.h:83
@ FR_EVENT_FILTER_VNODE
Filter for vnode subfilters.
Definition event.h:85
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition event.h:84
#define fr_event_filter_update(...)
Definition event.h:240
#define fr_event_filter_insert(...)
Definition event.h:235
#define FR_EVENT_RESUME(_s, _f)
Re-add the filter for a func from kevent.
Definition event.h:132
#define FR_EVENT_SUSPEND(_s, _f)
Temporarily remove the filter for a func from kevent.
Definition event.h:116
fr_event_fd_cb_t extend
Additional files were added to a directory.
Definition event.h:199
Callbacks for the FR_EVENT_FILTER_IO filter.
Definition event.h:189
Structure describing a modification to a filter's state.
Definition event.h:97
Callbacks for the FR_EVENT_FILTER_VNODE filter.
Definition event.h:196
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
Definition heap.c:322
unsigned int fr_heap_index_t
Definition heap.h:80
#define fr_heap_alloc(_ctx, _cmp, _type, _field, _init)
Creates a heap that can be used with non-talloced elements.
Definition heap.h:100
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition heap.h:179
#define FR_HEAP_INDEX_INVALID
Definition heap.h:83
The main heap structure.
Definition heap.h:66
uint64_t out
Definition base.h:43
uint64_t dup
Definition base.h:44
uint64_t dropped
Definition base.h:45
uint64_t in
Definition base.h:42
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition control.c:417
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:343
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition control.c:149
The control structure.
Definition control.c:79
size_t num_messages
for the message ring buffer
Definition listen.h:52
bool non_socket_listener
special internal listener that does not use sockets.
Definition listen.h:45
char const * name
printable name for this socket - set by open
Definition listen.h:29
void const * app_instance
Definition listen.h:38
size_t default_message_size
copied from app_io, but may be changed
Definition listen.h:51
fr_app_t const * app
Definition listen.h:37
CONF_SECTION * server_cs
CONF_SECTION of the server.
Definition listen.h:40
bool no_write_callback
sometimes we don't need to do writes
Definition listen.h:44
int fd
file descriptor for this socket - set by open
Definition listen.h:28
bool needs_full_setup
Set to true to avoid the short cut when adding the listener.
Definition listen.h:46
fr_app_io_t const * app_io
I/O path functions.
Definition listen.h:31
fr_ring_buffer_t * rb
ring buffer for my control-plane messages
Definition network.c:126
fr_cmd_table_t cmd_network_table[]
Definition network.c:2099
size_t fr_network_listen_outstanding(fr_network_t *nr, fr_listen_t *li)
Get the number of outstanding packets.
Definition network.c:811
size_t written
however much we did in a partial write
Definition network.c:92
int fr_network_listen_send_packet(fr_network_t *nr, fr_listen_t *parent, fr_listen_t *li, const uint8_t *buffer, size_t buflen, fr_time_t recv_time, void *packet_ctx)
Send a packet to the worker.
Definition network.c:769
fr_atomic_queue_t * aq_control
atomic queue for control messages sent to me
Definition network.c:122
static int cmd_stats_socket(FILE *fp, FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
Definition network.c:2079
int fr_network_listen_add(fr_network_t *nr, fr_listen_t *li)
Add a fr_listen_t to a network.
Definition network.c:236
bool suspended
whether or not we're suspended.
Definition network.c:117
int fr_network_worker_add(fr_network_t *nr, fr_worker_t *worker)
Add a worker to a network in a different thread.
Definition network.c:293
int fr_network_destroy(fr_network_t *nr)
Stop a network thread in an orderly way.
Definition network.c:1687
fr_network_t * nr
O(N) issues in talloc.
Definition network.c:77
fr_io_stats_t stats
Definition network.c:70
fr_listen_t * listen
Definition network.c:52
static void fr_network_listen_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a new listener.
Definition network.c:1313
uint8_t * packet
Definition network.c:53
static int cmd_stats_self(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
Definition network.c:2046
fr_log_t const * log
log destination
Definition network.c:119
int fr_network_directory_add(fr_network_t *nr, fr_listen_t *li)
Add a "watch directory" call to a network.
Definition network.c:278
static int _fr_network_free(fr_network_t *nr)
Free any resources associated with a network thread.
Definition network.c:1871
static void fr_network_inject_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a packet sent to a socket.
Definition network.c:1561
fr_heap_index_t heap_id
for the sockets_by_num heap
Definition network.c:79
#define RTT(_old, _new)
Definition network.c:484
void fr_network(fr_network_t *nr)
The main network worker function.
Definition network.c:1803
fr_message_set_t * ms
message buffers for this socket.
Definition network.c:89
int fr_network_listen_delete(fr_network_t *nr, fr_listen_t *li)
Delete a socket from a network.
Definition network.c:259
int num_blocked
number of blocked workers
Definition network.c:138
static void fr_network_worker_started_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
char const * name
Network ID for logging.
Definition network.c:113
void fr_network_worker_add_self(fr_network_t *nr, fr_worker_t *worker)
Add a worker to a network in the same thread.
Definition network.c:313
unsigned int outstanding
number of outstanding packets sent to the worker
Definition network.c:86
static _Thread_local fr_ring_buffer_t * fr_network_rb
Definition network.c:49
int number
unique ID
Definition network.c:78
static fr_event_update_t const resume_write[]
Definition network.c:1131
fr_time_delta_t predicted
predicted processing time for one packet
Definition network.c:64
static int fr_network_pre_event(fr_time_t now, fr_time_delta_t wake, void *uctx)
fr_worker_t * worker
worker pointer
Definition network.c:69
int fr_network_sendto_worker(fr_network_t *nr, fr_listen_t *li, void *packet_ctx, uint8_t const *data, size_t data_len, fr_time_t recv_time)
Definition network.c:1036
int fr_network_exit(fr_network_t *nr)
Signal a network thread to exit.
Definition network.c:1858
#define MAX_WORKERS
Definition network.c:47
int fr_network_listen_inject(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, fr_time_t recv_time)
Inject a packet for a listener to read.
Definition network.c:398
fr_listen_t * listen
I/O ctx and functions.
Definition network.c:87
int num_sockets
actually a counter...
Definition network.c:141
fr_rb_node_t listen_node
rbtree node for looking up by listener.
Definition network.c:74
static void fr_network_vnode_extend(UNUSED fr_event_list_t *el, int sockfd, int fflags, void *ctx)
Get a notification that a vnode changed.
Definition network.c:1081
static void _signal_pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Read handler for signal pipe.
Definition network.c:1770
#define OUTSTANDING(_x)
Definition network.c:601
static int8_t reply_cmp(void const *one, void const *two)
Definition network.c:157
int num_workers
number of active workers
Definition network.c:137
static int8_t socket_num_cmp(void const *one, void const *two)
Definition network.c:186
int num_pending_workers
number of workers we're waiting to start.
Definition network.c:139
fr_rb_tree_t * sockets
list of sockets we're managing, ordered by the listener
Definition network.c:134
pthread_t thread_id
for self
Definition network.c:115
fr_log_lvl_t lvl
debug log level
Definition network.c:120
int signal_pipe[2]
Pipe for signalling the worker in an orderly way.
Definition network.c:143
fr_channel_data_t * pending
the currently pending partial packet
Definition network.c:94
static int8_t socket_listen_cmp(void const *one, void const *two)
Definition network.c:179
static void fr_network_write(UNUSED fr_event_list_t *el, UNUSED int sockfd, UNUSED int flags, void *ctx)
Write packets to the network.
Definition network.c:1144
fr_event_list_t * el
our event list
Definition network.c:128
fr_heap_t * replies
replies from the worker, ordered by priority / origin time
Definition network.c:130
static int fr_network_send_request(fr_network_t *nr, fr_channel_data_t *cd)
Send a message on the "best" channel.
Definition network.c:608
void fr_network_stats_log(fr_network_t const *nr, fr_log_t const *log)
Definition network.c:2032
fr_heap_t * waiting
packets waiting to be written
Definition network.c:95
int fr_network_stats(fr_network_t const *nr, int num, uint64_t *stats)
Definition network.c:2016
fr_heap_index_t heap_id
workers are in a heap
Definition network.c:62
bool blocked
is this worker blocked?
Definition network.c:66
static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx)
Read a packet from the network.
Definition network.c:868
static bool is_network_thread(fr_network_t const *nr)
Definition network.c:224
fr_rb_node_t num_node
rbtree node for looking up by number.
Definition network.c:75
static void fr_network_error(UNUSED fr_event_list_t *el, UNUSED int sockfd, int flags, int fd_errno, void *ctx)
Handle errors for a socket.
Definition network.c:1106
fr_io_stats_t stats
Definition network.c:96
static fr_ring_buffer_t * fr_network_rb_init(void)
Initialise thread local storage.
Definition network.c:206
fr_channel_data_t * cd
cached in case of allocation & read error
Definition network.c:90
static int fr_network_listen_add_self(fr_network_t *nr, fr_listen_t *listen)
Definition network.c:1327
static void fr_network_suspend(fr_network_t *nr)
Definition network.c:445
bool dead
is it dead?
Definition network.c:83
size_t leftover
leftover data from a previous read
Definition network.c:91
static void fr_network_post_event(fr_event_list_t *el, fr_time_t now, void *uctx)
fr_network_worker_t * workers[MAX_WORKERS]
each worker
Definition network.c:149
fr_time_t recv_time
Definition network.c:55
static void fr_network_unsuspend(fr_network_t *nr)
Definition network.c:464
fr_rb_tree_t * sockets_by_num
ordered by number;
Definition network.c:135
fr_network_config_t config
configuration
Definition network.c:148
void fr_network_listen_read(fr_network_t *nr, fr_listen_t *li)
Signal the network to read from a listener.
Definition network.c:324
static int8_t waiting_cmp(void const *one, void const *two)
Definition network.c:168
fr_io_stats_t stats
Definition network.c:132
static int _fr_network_rb_free(void *arg)
Definition network.c:197
static void fr_network_recv_reply(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Callback which handles a message being received on the network side.
Definition network.c:492
int max_workers
maximum number of allowed workers
Definition network.c:140
void fr_network_listen_write(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, void *packet_ctx, fr_time_t request_time)
Inject a packet for a listener to write.
Definition network.c:350
bool exiting
are we exiting?
Definition network.c:146
fr_event_filter_t filter
what type of filter it is
Definition network.c:81
static void fr_network_socket_dead(fr_network_t *nr, fr_network_socket_t *s)
Definition network.c:827
static void fr_network_directory_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a new "watch directory".
Definition network.c:1440
fr_channel_t * channel
channel to the worker
Definition network.c:68
static fr_event_update_t const pause_write[]
Definition network.c:1126
fr_network_t * fr_network_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_network_config_t const *config)
Create a network.
Definition network.c:1891
static int _network_socket_free(fr_network_socket_t *s)
Definition network.c:1269
static void fr_network_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
Handle a network control message callback for a channel.
Definition network.c:540
static int cmd_socket_list(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
Definition network.c:2059
fr_time_delta_t cpu_time
how much CPU time this worker has spent
Definition network.c:63
fr_control_t * control
the control plane
Definition network.c:124
bool blocked
is it blocked?
Definition network.c:84
Associate a worker thread with a network thread.
Definition network.c:61
uint32_t max_outstanding
Definition network.h:46
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define DEBUG4(_fmt,...)
Definition log.h:267
#define RATE_LIMIT_GLOBAL(_log, _fmt,...)
Rate limit messages using a global limiting entry.
Definition log.h:641
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition event.c:2197
int fr_event_pre_delete(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Delete a pre-event callback from the event list.
Definition event.c:1979
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition event.c:2062
talloc_free(reap)
int fr_event_post_delete(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Delete a post-event callback from the event list.
Definition event.c:2033
int fr_event_pre_insert(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Add a pre-event callback to the event list.
Definition event.c:1957
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition event.c:1206
int fr_event_post_insert(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Add a post-event callback to the event list.
Definition event.c:2011
Stores all information relating to an event list.
Definition event.c:380
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition log.c:583
fr_log_lvl_t
Definition log.h:67
@ L_DBG
Only displayed when debugging is enabled.
Definition log.h:59
static fr_event_update_t pause_read[]
Definition master.c:170
static fr_event_update_t resume_read[]
Definition master.c:175
unsigned int uint32_t
long int ssize_t
unsigned char uint8_t
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:190
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:988
fr_message_t * fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
Localize a message by copying it to local storage.
Definition message.c:242
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition message.c:934
fr_message_t * fr_message_alloc_reserve(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size, size_t leftover, size_t reserve_size)
Allocate packet data for a message, and reserve a new message.
Definition message.c:1077
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
Definition message.c:127
A Message set, composed of message headers and ring buffer data.
Definition message.c:95
size_t rb_size
cache-aligned size in the ring buffer
Definition message.h:51
fr_time_t when
when this message was sent
Definition message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
@ FR_MESSAGE_USED
Definition message.h:39
@ FR_MESSAGE_LOCALIZED
Definition message.h:40
fr_message_status_t status
free, used, done, etc.
Definition message.h:45
int fr_nonblock(UNUSED int fd)
Definition misc.c:293
static const conf_parser_t config[]
Definition base.c:183
#define fr_assert(_expr)
Definition rad_assert.h:38
#define DEBUG2(fmt,...)
Definition radclient.h:43
static fr_app_io_t app_io
uint32_t fr_rand(void)
Return a 32-bit random number.
Definition rand.c:105
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
Return how many nodes there are in a tree.
Definition rb.c:781
void * fr_rb_iter_init_inorder(fr_rb_iter_inorder_t *iter, fr_rb_tree_t *tree)
Initialise an in-order iterator.
Definition rb.c:824
void * fr_rb_iter_next_inorder(fr_rb_iter_inorder_t *iter)
Return the next node.
Definition rb.c:850
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
bool fr_rb_delete(fr_rb_tree_t *tree, void const *data)
Remove node and free data (if a free function was specified)
Definition rb.c:741
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
Definition rb.h:246
int fr_rb_flatten_inorder(TALLOC_CTX *ctx, void **out[], fr_rb_tree_t *tree)
Iterator structure for in-order traversal of an rbtree.
Definition rb.h:321
The main red black tree structure.
Definition rb.h:73
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:64
static char const * name
static char buff[sizeof("18446744073709551615")+3]
Definition size_tests.c:41
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
Definition log.h:96
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
#define talloc_get_type_abort_const
Definition talloc.h:282
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition time.h:575
static fr_time_delta_t fr_time_delta_add(fr_time_delta_t a, fr_time_delta_t b)
Definition time.h:255
#define fr_time_delta_lt(_a, _b)
Definition time.h:285
#define fr_time_wrap(_time)
Definition time.h:145
#define fr_time_delta_ispos(_a)
Definition time.h:290
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition time.h:916
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
close(uq->fd)
static fr_event_list_t * el
static fr_slen_t parent
Definition pair.h:845
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
Definition strerror.c:733
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1274
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
Definition worker.c:1609
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
Definition worker.c:1636
A worker which takes packets from a master, and processes them.
Definition worker.c:94