The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
network.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: f4a48a9872e49ed6c57cb2b7367c0cd34d38a022 $
19 *
20 * @brief Receiver of socket data, which sends messages to the workers.
21 * @file io/network.c
22 *
23 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24 */
25RCSID("$Id: f4a48a9872e49ed6c57cb2b7367c0cd34d38a022 $")
26
27#define LOG_PREFIX nr->name
28
29#define LOG_DST nr->log
30
31#include <freeradius-devel/util/event.h>
32#include <freeradius-devel/util/misc.h>
33#include <freeradius-devel/util/rand.h>
34#include <freeradius-devel/util/rb.h>
35#include <freeradius-devel/util/syserror.h>
36#include <freeradius-devel/util/atexit.h>
37#include <freeradius-devel/util/talloc.h>
38
39#include <freeradius-devel/io/channel.h>
40#include <freeradius-devel/io/control.h>
41#include <freeradius-devel/io/listen.h>
42#include <freeradius-devel/io/network.h>
43#include <freeradius-devel/io/queue.h>
44#include <freeradius-devel/io/ring_buffer.h>
45#include <freeradius-devel/io/worker.h>
46
47#define MAX_WORKERS 64
48
49static _Thread_local fr_ring_buffer_t *fr_network_rb;
50
57
58/** Associate a worker thread with a network thread
59 *
60 */
61typedef struct {
62 fr_heap_index_t heap_id; //!< workers are in a heap
63 fr_time_delta_t cpu_time; //!< how much CPU time this worker has spent
64 fr_time_delta_t predicted; //!< predicted processing time for one packet
65
66 bool blocked; //!< is this worker blocked?
67
68 fr_channel_t *channel; //!< channel to the worker
69 fr_worker_t *worker; //!< worker pointer
72
73typedef struct {
74 fr_rb_node_t listen_node; //!< rbtree node for looking up by listener.
75 fr_rb_node_t num_node; //!< rbtree node for looking up by number.
76
77 fr_network_t *nr; //!< O(N) issues in talloc
78 int number; //!< unique ID
79 fr_heap_index_t heap_id; //!< for the sockets_by_num heap
80
81 fr_event_filter_t filter; //!< what type of filter it is
82
83 bool dead; //!< is it dead?
84 bool blocked; //!< is it blocked?
85
86 unsigned int outstanding; //!< number of outstanding packets sent to the worker
87 fr_listen_t *listen; //!< I/O ctx and functions.
88
89 fr_message_set_t *ms; //!< message buffers for this socket.
90 fr_channel_data_t *cd; //!< cached in case of allocation & read error
91 size_t leftover; //!< leftover data from a previous read
92 size_t written; //!< however much we did in a partial write
93
94 fr_channel_data_t *pending; //!< the currently pending partial packet
95 fr_heap_t *waiting; //!< packets waiting to be written
98
99/*
100 * We have an array of workers, so we can index the workers in
101 * O(1) time. remove the heap of "workers ordered by CPU time"
102 * when we send a packet to a worker, just update the predicted
103 * CPU time in place. when we receive a reply from a worker,
104 * just update the predicted CPU time in place.
105 *
106 * when we need to choose a worker, pick 2 at random, and then
107 * choose the one with the lowe cpu time. For background, see
108 * "Power of Two-Choices" and
109 * https://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf
110 * https://www.eecs.harvard.edu/~michaelm/postscripts/tpds2001.pdf
111 */
113 char const *name; //!< Network ID for logging.
114
115 pthread_t thread_id; //!< for self
116
117 bool suspended; //!< whether or not we're suspended.
118
119 fr_log_t const *log; //!< log destination
120 fr_log_lvl_t lvl; //!< debug log level
121
122 fr_atomic_queue_t *aq_control; //!< atomic queue for control messages sent to me
123
124 fr_control_t *control; //!< the control plane
125
126 fr_ring_buffer_t *rb; //!< ring buffer for my control-plane messages
127
128 fr_event_list_t *el; //!< our event list
129
130 fr_heap_t *replies; //!< replies from the worker, ordered by priority / origin time
131
133
134 fr_rb_tree_t *sockets; //!< list of sockets we're managing, ordered by the listener
135 fr_rb_tree_t *sockets_by_num; //!< ordered by number;
136
137 int num_workers; //!< number of active workers
138 int num_blocked; //!< number of blocked workers
139 int num_pending_workers; //!< number of workers we're waiting to start.
140 int max_workers; //!< maximum number of allowed workers
141 int num_sockets; //!< actually a counter...
142
143 int signal_pipe[2]; //!< Pipe for signalling the worker in an orderly way.
144 ///< This is more deterministic than using async signals.
145
146 bool exiting; //!< are we exiting?
147
148 fr_network_config_t config; //!< configuration
150};
151
152static void fr_network_post_event(fr_event_list_t *el, fr_time_t now, void *uctx);
153static int fr_network_pre_event(fr_time_t now, fr_time_delta_t wake, void *uctx);
155static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx);
156
157static int8_t reply_cmp(void const *one, void const *two)
158{
159 fr_channel_data_t const *a = one, *b = two;
160 int ret;
161
162 ret = CMP(a->priority, b->priority);
163 if (ret != 0) return ret;
164
165 return fr_time_cmp(a->m.when, b->m.when);
166}
167
168static int8_t waiting_cmp(void const *one, void const *two)
169{
170 fr_channel_data_t const *a = one, *b = two;
171 int ret;
172
173 ret = CMP(a->priority, b->priority);
174 if (ret != 0) return ret;
175
176 return fr_time_cmp(a->reply.request_time, b->reply.request_time);
177}
178
179static int8_t socket_listen_cmp(void const *one, void const *two)
180{
181 fr_network_socket_t const *a = one, *b = two;
182
183 return CMP(a->listen, b->listen);
184}
185
186static int8_t socket_num_cmp(void const *one, void const *two)
187{
188 fr_network_socket_t const *a = one, *b = two;
189
190 return CMP(a->number, b->number);
191}
192
193/*
194 * Explicitly cleanup the memory allocated to the ring buffer,
195 * just in case valgrind complains about it.
196 */
197static int _fr_network_rb_free(void *arg)
198{
199 return talloc_free(arg);
200}
201
202/** Initialise thread local storage
203 *
204 * @return fr_ring_buffer_t for messages
205 */
207{
209
210 rb = fr_network_rb;
211 if (rb) return rb;
212
214 if (!rb) {
215 fr_perror("Failed allocating memory for network ring buffer");
216 return NULL;
217 }
218
220
221 return rb;
222}
223
224static inline bool is_network_thread(fr_network_t const *nr)
225{
226 return (pthread_equal(pthread_self(), nr->thread_id) != 0);
227}
228
230
231/** Add a fr_listen_t to a network
232 *
233 * @param nr the network
234 * @param li the listener
235 */
237{
239
240 /*
241 * Associate the protocol dictionary with the listener, so that the decode functions can check /
242 * use it.
243 *
244 * A virtual server may start off with a "dictionary" block, and therefore define a local
245 * dictionary. So the "root" dictionary of a virtual server may not be a protocol dict.
246 */
247 fr_assert(li->server_cs != NULL);
249
250 fr_assert(li->dict != NULL);
251
252 /*
253 * Skip a bunch of work if we're already in the network thread.
254 */
255 if (is_network_thread(nr) && !li->needs_full_setup) {
256 return fr_network_listen_add_self(nr, li);
257 }
258
259 rb = fr_network_rb_init();
260 if (!rb) return -1;
261
262 return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_LISTEN, &li, sizeof(li));
263}
264
265
266/** Delete a socket from a network. MUST be called only by the listener itself!.
267 *
268 * @param nr the network
269 * @param li the listener
270 */
272{
274
276
277 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
278 if (!s) return -1;
279
281
282 return 0;
283}
284
285/** Add a "watch directory" call to a network
286 *
287 * @param nr the network
288 * @param li the listener
289 */
291{
293
294 rb = fr_network_rb_init();
295 if (!rb) return -1;
296
297 return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_DIRECTORY, &li, sizeof(li));
298}
299
300/** Add a worker to a network in a different thread
301 *
302 * @param nr the network
303 * @param worker the worker
304 */
306{
308
309 rb = fr_network_rb_init();
310 if (!rb) return -1;
311
312 (void) talloc_get_type_abort(nr, fr_network_t);
313 (void) talloc_get_type_abort(worker, fr_worker_t);
314
315 return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_WORKER, &worker, sizeof(worker));
316}
317
318static void fr_network_worker_started_callback(void *ctx, void const *data, size_t data_size, fr_time_t now);
319
320/** Add a worker to a network in the same thread
321 *
322 * @param nr the network
323 * @param worker the worker
324 */
326{
327 fr_network_worker_started_callback(nr, &worker, sizeof(worker), fr_time_wrap(0));
328}
329
330
331/** Signal the network to read from a listener
332 *
333 * @param nr the network
334 * @param li the listener to read from
335 */
337{
339
340 (void) talloc_get_type_abort(nr, fr_network_t);
342
343 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
344 if (!s) return;
345
346 /*
347 * Go read the socket.
348 */
349 fr_network_read(nr->el, s->listen->fd, 0, s);
350}
351
352
353/** Inject a packet for a listener to write
354 *
355 * @param nr the network
356 * @param li the listener where the packet is being injected
357 * @param packet the packet to be written
358 * @param packet_len the length of the packet
359 * @param packet_ctx The packet context to write
360 * @param request_time when the packet was received.
361 */
362void fr_network_listen_write(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len,
363 void *packet_ctx, fr_time_t request_time)
364{
365 fr_message_t *lm;
367
368 cd = (fr_channel_data_t) {
369 .m = (fr_message_t) {
371 .data_size = packet_len,
372 .when = request_time,
373 },
374
375 .channel = {
376 .heap_id = FR_HEAP_INDEX_INVALID,
377 },
378
379 .listen = li,
380 .priority = PRIORITY_NOW,
381 .reply.request_time = request_time,
382 };
383
384 memcpy(&cd.m.data, &packet, sizeof(packet)); /* const issues */
385 memcpy(&cd.packet_ctx, &packet_ctx, sizeof(packet_ctx)); /* const issues */
386
387 /*
388 * Localize the message and insert it into the heap of pending messages.
389 */
390 lm = fr_message_localize(nr, &cd.m, sizeof(cd));
391 if (!lm) return;
392
393 if (fr_heap_insert(&nr->replies, lm) < 0) {
394 fr_message_done(lm);
395 }
396}
397
398
399/** Inject a packet for a listener to read
400 *
401 * @param nr the network
402 * @param li the listener where the packet is being injected
403 * @param packet the packet to be injected
404 * @param packet_len the length of the packet
405 * @param recv_time when the packet was received.
406 * @return
407 * - <0 on error
408 * - 0 on success
409 */
410int fr_network_listen_inject(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, fr_time_t recv_time)
411{
413 fr_network_inject_t my_inject;
414
415 /*
416 * Can't inject to injection-less destinations.
417 */
418 if (!li->app_io->inject) {
419 fr_strerror_const("Listener cannot accept injected packet");
420 return -1;
421 }
422
423 /*
424 * Avoid a bounce through the event loop if we're being called from the network thread.
425 */
426 if (is_network_thread(nr)) {
428
429 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
430 if (!s) {
431 fr_strerror_const("Listener was not found for injected packet");
432 return -1;
433 }
434
435 /*
436 * Inject the packet. The master.c mod_read() routine will then take care of avoiding
437 * IO, and instead return the packet to the network side.
438 */
439 if (li->app_io->inject(li, packet, packet_len, recv_time) == 0) {
440 (void) fr_network_read(nr->el, li->fd, 0, s);
441 }
442
443 return 0;
444 }
445
446 rb = fr_network_rb_init();
447 if (!rb) return -1;
448
449 my_inject.listen = li;
450 my_inject.packet = talloc_memdup(NULL, packet, packet_len);
451 my_inject.packet_len = packet_len;
452 my_inject.recv_time = recv_time;
453
454 return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_INJECT, &my_inject, sizeof(my_inject));
455}
456
459 { 0 }
460};
461
464 { 0 }
465};
466
469 { 0 }
470};
471
474 { 0 }
475};
476
478{
481
482 if (nr->suspended) return;
483
484 for (s = fr_rb_iter_init_inorder(nr->sockets, &iter);
485 s != NULL;
486 s = fr_rb_iter_next_inorder(nr->sockets, &iter)) {
488 }
489 nr->suspended = true;
490}
491
493{
496
497 if (!nr->suspended) return;
498
499 for (s = fr_rb_iter_init_inorder(nr->sockets, &iter);
500 s != NULL;
501 s = fr_rb_iter_next_inorder(nr->sockets, &iter)) {
503 }
504 nr->suspended = false;
505}
506
507#define IALPHA (8)
508#define RTT(_old, _new) fr_time_delta_wrap((fr_time_delta_unwrap(_new) + (fr_time_delta_unwrap(_old) * (IALPHA - 1))) / IALPHA)
509
510/** Callback which handles a message being received on the network side.
511 *
512 * @param[in] ctx the network
513 * @param[in] ch the channel that the message is on.
514 * @param[in] cd the message (if any) to start with
515 */
517{
518 fr_network_t *nr = ctx;
519 fr_network_worker_t *worker;
520
521 cd->channel.ch = ch;
522
523 /*
524 * Update stats for the worker.
525 */
527 worker->stats.out++;
528 worker->cpu_time = cd->reply.cpu_time;
529 if (!fr_time_delta_ispos(worker->predicted)) {
530 worker->predicted = cd->reply.processing_time;
531 } else {
532 worker->predicted = RTT(worker->predicted, cd->reply.processing_time);
533 }
534
535 /*
536 * Unblock the worker.
537 */
538 if (worker->blocked) {
539 worker->blocked = false;
540 nr->num_blocked--;
542 }
543
544 /*
545 * Ensure that heap insert works.
546 */
547 cd->channel.heap_id = FR_HEAP_INDEX_INVALID;
548 if (fr_heap_insert(&nr->replies, cd) < 0) {
549 fr_message_done(&cd->m);
550 fr_assert(0 == 1);
551 }
552}
553
554/** Handle a network control message callback for a channel
555 *
556 * This is called from the event loop when we get a notification
557 * from the event signalling pipe.
558 *
559 * @param[in] ctx the network
560 * @param[in] data the message
561 * @param[in] data_size size of the data
562 * @param[in] now the current time
563 */
564static void fr_network_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
565{
567 fr_channel_t *ch;
568 fr_network_t *nr = ctx;
569
570 ce = fr_channel_service_message(now, &ch, data, data_size);
571 DEBUG3("Channel %s",
572 fr_table_str_by_value(channel_signals, ce, "<INVALID>"));
573 switch (ce) {
574 case FR_CHANNEL_ERROR:
575 return;
576
577 case FR_CHANNEL_EMPTY:
578 return;
579
580 case FR_CHANNEL_NOOP:
581 break;
582
584 fr_assert(ch != NULL);
585 while (fr_channel_recv_reply(ch));
586 break;
587
589 fr_assert(0 == 1);
590 break;
591
592 case FR_CHANNEL_OPEN:
593 fr_assert(0 == 1);
594 break;
595
596 case FR_CHANNEL_CLOSE:
597 {
598 fr_network_worker_t *w = talloc_get_type_abort(fr_channel_requestor_uctx_get(ch),
600 int i;
601
602 /*
603 * Remove this worker from the array
604 */
605 DEBUG3("Worker acked our close request");
606 for (i = 0; i < nr->num_workers; i++) {
607 if (nr->workers[i] == w) {
608 if (i == (nr->num_workers - 1)) break;
609
610 /*
611 * Close the hole...
612 */
613 memmove(&nr->workers[i], &nr->workers[i + 1],
614 (uint8_t *) &nr->workers[nr->num_workers] - (uint8_t *) &nr->workers[i + 1]);
615 break;
616 }
617 }
618 nr->num_workers--;
619 nr->workers[nr->num_workers] = NULL; /* over-write now unused pointer */
620 }
621 break;
622 }
623}
624
625#define OUTSTANDING(_x) ((_x)->stats.in - (_x)->stats.out)
626
627/** Send a message on the "best" channel.
628 *
629 * @param nr the network
630 * @param cd the message we've received
631 */
633{
634 fr_network_worker_t *worker;
635
636 (void) talloc_get_type_abort(nr, fr_network_t);
637
638 if (!nr->num_workers) {
639 RATE_LIMIT_GLOBAL(ERROR, "Failed sending packet to worker - "
640 "No workers are available");
641 return -1;
642 }
643
644retry:
645 if (nr->num_workers == 1) {
646 worker = nr->workers[0];
647 if (worker->blocked) {
648 RATE_LIMIT_GLOBAL(ERROR, "Failed sending packet to worker - "
649 "In single-threaded mode and worker is blocked");
650 drop:
651 worker->stats.dropped++;
652 return -1;
653 }
654
655 } else if (nr->num_blocked == 0) {
656 int64_t cmp;
657 uint32_t one, two;
658
659 one = fr_rand() % nr->num_workers;
660 do {
661 two = fr_rand() % nr->num_workers;
662 } while (two == one);
663
664 /*
665 * Choose a worker based on minimizing the amount
666 * of future work it's being asked to do.
667 *
668 * If both workers have the same number of
669 * outstanding requests, then choose the worker
670 * which has used the least total CPU time.
671 */
672 cmp = (OUTSTANDING(nr->workers[one]) - OUTSTANDING(nr->workers[two]));
673 if (cmp < 0) {
674 worker = nr->workers[one];
675
676 } else if (cmp > 0) {
677 worker = nr->workers[two];
678
679 } else if (fr_time_delta_lt(nr->workers[one]->cpu_time, nr->workers[two]->cpu_time)) {
680 worker = nr->workers[one];
681
682 } else {
683 worker = nr->workers[two];
684 }
685 } else {
686 int i;
687 uint64_t min_outstanding = UINT64_MAX;
688 fr_network_worker_t *found = NULL;
689
690 /*
691 * Some workers are blocked. Pick the worker
692 * with the least amount of future work to do.
693 */
694 for (i = 0; i < nr->num_workers; i++) {
695 uint64_t outstanding;
696
697 worker = nr->workers[i];
698 if (worker->blocked) continue;
699
700 outstanding = OUTSTANDING(worker);
701 if ((outstanding < min_outstanding) || !found) {
702 found = worker;
703 min_outstanding = outstanding;
704
705 } else if (outstanding == min_outstanding) {
706 /*
707 * Queue lengths are the same.
708 * Choose this worker if it's
709 * less busy than the previous one we found.
710 */
711 if (fr_time_delta_lt(worker->cpu_time, found->cpu_time)) {
712 found = worker;
713 }
714 }
715 }
716
717 if (!found) {
718 RATE_LIMIT_GLOBAL(PERROR, "Failed sending packet to worker - Couldn't find active worker, "
719 "%u/%u workers are blocked", nr->num_blocked, nr->num_workers);
720 return -1;
721 }
722
723 worker = found;
724 }
725
726 (void) talloc_get_type_abort(worker, fr_network_worker_t);
727
728 /*
729 * Too many outstanding packets for this worker. Drop
730 * the request.
731 *
732 * If the worker we've picked has too many outstanding
733 * packets, then we have either only one worker, in which
734 * cae we should drop the packet. Or, we were unable to
735 * find a worker with smaller than max_outstanding
736 * packets. In which case all of the workers are likely
737 * at max_outstanding.
738 *
739 * In both cases, we should just drop the new packet.
740 */
741 fr_assert(worker->stats.in >= worker->stats.out);
742 if (nr->config.max_outstanding &&
743 (OUTSTANDING(worker) >= nr->config.max_outstanding)) {
744 RATE_LIMIT_GLOBAL(PERROR, "max_outstanding reached - dropping packet");
745 goto drop;
746 }
747
748 /*
749 * Send the message to the channel. If we fail, drop the
750 * packet. The only reason for failure is that the
751 * worker isn't servicing it's input queue. When that
752 * happens, we have no idea what to do, and the whole
753 * thing falls over.
754 */
755 if (fr_channel_send_request(worker->channel, cd) < 0) {
756 worker->stats.dropped++;
757 worker->blocked = true;
758 nr->num_blocked++;
759
760 RATE_LIMIT_GLOBAL(PERROR, "Failed sending packet to worker - %u/%u workers are blocked",
761 nr->num_blocked, nr->num_workers);
762
763 if (nr->num_blocked == nr->num_workers) {
765 return -1;
766 }
767 goto retry;
768 }
769
770 worker->stats.in++;
771
772 /*
773 * We're projecting that the worker will use more CPU
774 * time to process this request. The CPU time will be
775 * updated with a more accurate number when we receive a
776 * reply from this channel.
777 */
778 worker->cpu_time = fr_time_delta_add(worker->cpu_time, worker->predicted);
779
780 return 0;
781}
782
783
784/** Send a packet to the worker.
785 *
786 * MUST only be called from the network thread.
787 *
788 * @param nr the network
789 * @param parent the parent listener
790 * @param li the listener that the packet was "read" from. Can be "parent"
791 * @param buffer the packet to send
792 * @param buflen size of the packet to send
793 * @param recv_time of the packet
794 * @param packet_ctx for the packet
795 * @return
796 * - <0 on error
797 * - 0 on success
798 */
800 const uint8_t *buffer, size_t buflen, fr_time_t recv_time, void *packet_ctx)
801{
804
805 (void) talloc_get_type_abort(nr, fr_network_t);
807
808 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
809 if (!s) return -1;
810
811 cd = (fr_channel_data_t *) fr_message_alloc(s->ms, NULL, buflen);
812 if (!cd) return -1;
813
814 cd->listen = parent;
816 cd->packet_ctx = packet_ctx;
817 cd->request.recv_time = recv_time;
818 memcpy(cd->m.data, buffer, buflen);
819 cd->m.when = fr_time();
820
821 if (fr_network_send_request(nr, cd) < 0) {
823 fr_message_done(&cd->m);
824 nr->stats.dropped++;
825 s->stats.dropped++;
826 return -1;
827 }
828
829 s->outstanding++;
830 return 0;
831}
832
833/** Get the number of outstanding packets
834 *
835 * @param nr the network
836 * @param li the listener that the packet was "read" from
837 * @return
838 * - <0 on error
839 * - the number of outstanding packets
840*/
843
844 (void) talloc_get_type_abort(nr, fr_network_t);
846
847 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
848 if (!s) return -1;
849
850 return s->outstanding;
851}
852
853/*
854 * Mark it as dead, but DON'T free it until all of the replies
855 * have come in.
856 */
858{
859 int i;
860
861 if (s->dead) return;
862
863 s->dead = true;
864
865 /*
866 * This FD is no longer part of the event loop.
867 */
868 fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
869
870 for (i = 0; i < nr->max_workers; i++) {
871 if (!nr->workers[i]) continue;
872
873 (void) fr_worker_listen_cancel(nr->workers[i]->worker, s->listen);
874 }
875
876 /*
877 * If there are no outstanding packets, then we can free
878 * it now.
879 */
880 if (!s->outstanding) {
881 talloc_free(s);
882 return;
883 }
884
885 /*
886 * There are still outstanding packets. Leave it in the
887 * socket tree, so that replies from the worker can find
888 * it. When we've received all of the replies, then
889 * fr_network_post_event() will clean up this socket.
890 */
891}
892
893/** Read a packet from the network.
894 *
895 * @param[in] el the event list.
896 * @param[in] sockfd the socket which is ready to read.
897 * @param[in] flags from kevent.
898 * @param[in] ctx the network socket context.
899 */
900static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx)
901{
902 int num_messages = 0;
903 fr_network_socket_t *s = ctx;
904 fr_network_t *nr = s->nr;
905 ssize_t data_size;
906 fr_channel_data_t *cd, *next;
907
908 if (!fr_cond_assert_msg(s->listen->fd == sockfd, "Expected listen->fd (%u) to be equal event fd (%u)",
909 s->listen->fd, sockfd)) return;
910
911 DEBUG3("Reading data from FD %u", sockfd);
912
913 if (!s->cd) {
915 if (!cd) {
916 ERROR("Failed allocating message size %zd! - Closing socket",
919 return;
920 }
921 } else {
922 cd = s->cd;
923 }
924
925 fr_assert(cd->m.data != NULL);
926
927next_message:
928 /*
929 * Poll this socket, but not too often. We have to go
930 * service other sockets, too.
931 */
932 if (num_messages > 16) {
933 s->cd = cd;
934 return;
935 }
936
938
939 /*
940 * Read data from the network.
941 *
942 * Return of 0 means "no data", which is fine for UDP.
943 * For TCP, if an underlying read() on the TCP socket
944 * returns 0, (which signals that the FD is no longer
945 * usable) this function should return -1, so that the
946 * network side knows that it needs to close the
947 * connection.
948 */
949 data_size = s->listen->app_io->read(s->listen, &cd->packet_ctx, &cd->request.recv_time,
950 cd->m.data, cd->m.rb_size, &s->leftover);
951 if (data_size == 0) {
952 /*
953 * Cache the message for later. This is
954 * important for stream sockets, which can do
955 * partial reads into the current buffer. We
956 * need to be able to give the same buffer back
957 * to the stream socket for subsequent reads.
958 *
959 * Since we have a message set for each
960 * fr_io_socket_t, no "head of line"
961 * blocking issues can happen for stream sockets.
962 */
963 s->cd = cd;
964 return;
965 }
966
967 /*
968 * Error: close the connection, and remove the fr_listen_t
969 */
970 if (data_size < 0) {
971// fr_log(nr->log, L_DBG_ERR, "error from transport read on socket %d", sockfd);
973 return;
974 }
975 s->cd = NULL;
976
977 DEBUG3("Read %zd byte(s) from FD %u", data_size, sockfd);
978 if (s->listen->read_hexdump) HEXDUMP2(cd->m.data, data_size, "%s read ", s->listen->name);
979 nr->stats.in++;
980 s->stats.in++;
981
982 /*
983 * Initialize the rest of the fields of the channel data.
984 *
985 * We always use "now" as the time of the message, as the
986 * packet MAY be a duplicate packet magically resurrected
987 * from the past. i.e. If the read routines are doing
988 * dedup, then they notice that the packet is a
989 * duplicate. In that case, they send over a copy of the
990 * packet, BUT with the original timestamp. This
991 * information tells the worker that the packet is a
992 * duplicate.
993 */
994 cd->m.when = fr_time();
995 cd->listen = s->listen;
996
997 /*
998 * Nothing in the buffer yet. Allocate room for one
999 * packet.
1000 */
1001 if ((cd->m.data_size == 0) && (!s->leftover)) {
1002
1003 (void) fr_message_alloc(s->ms, &cd->m, data_size);
1004 next = NULL;
1005
1006 } else {
1007 /*
1008 * There are leftover bytes in the buffer, feed
1009 * them to the next round of reading.
1010 */
1011 if (s->leftover) {
1012 next = (fr_channel_data_t *) fr_message_alloc_reserve(s->ms, &cd->m, data_size, s->leftover,
1014 if (!next) {
1015 PERROR("Failed reserving partial packet.");
1016 // @todo - probably close the socket...
1017 fr_assert(0 == 1);
1018 }
1019 } else {
1020 next = NULL;
1021 }
1022 }
1023
1024 /*
1025 * Set the priority. Which incidentally also checks if
1026 * we're allowed to read this particular kind of packet.
1027 *
1028 * That check is because the app_io handlers just read
1029 * packets, and don't really have access to the parent
1030 * "list of allowed packet types". So we have to do the
1031 * work here in a callback.
1032 *
1033 * That should probably be fixed...
1034 */
1035 if (s->listen->app->priority) {
1036 int priority;
1037
1038 priority = s->listen->app->priority(s->listen->app_instance, cd->m.data, data_size);
1039 if (priority <= 0) goto discard;
1040
1041 cd->priority = priority;
1042 }
1043
1044 if (fr_network_send_request(nr, cd) < 0) {
1045 discard:
1046 talloc_free(cd->packet_ctx); /* not sure what else to do here */
1047 fr_message_done(&cd->m);
1048 nr->stats.dropped++;
1049 s->stats.dropped++;
1050
1051 } else {
1052 /*
1053 * One more packet sent to a worker.
1054 */
1055 s->outstanding++;
1056 }
1057
1058 /*
1059 * If there is a next message, go read it from the buffer.
1060 *
1061 * @todo - note that this calls read(), even if the
1062 * app_io has paused the reader. We likely want to be
1063 * able to check that, too. We might just remove this
1064 * "goto"...
1065 */
1066 if (next) {
1067 cd = next;
1068 num_messages++;
1069 goto next_message;
1070 }
1071}
1072
1073int fr_network_sendto_worker(fr_network_t *nr, fr_listen_t *li, void *packet_ctx, uint8_t const *data, size_t data_len, fr_time_t recv_time)
1074{
1077
1078 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
1079 if (!s) return -1;
1080
1081 cd = (fr_channel_data_t *) fr_message_alloc(s->ms, NULL, data_len);
1082 if (!cd) return -1;
1083
1084 s->stats.in++;
1085
1087
1088 cd->m.when = recv_time;
1089 cd->listen = li;
1090 cd->packet_ctx = packet_ctx;
1091
1092 memcpy(cd->m.data, data, data_len);
1093
1094 if (fr_network_send_request(nr, cd) < 0) {
1095 talloc_free(packet_ctx);
1096 fr_message_done(&cd->m);
1097 nr->stats.dropped++;
1098 s->stats.dropped++;
1099 return -1;
1100 }
1101
1102 /*
1103 * One more packet sent to a worker.
1104 */
1105 s->outstanding++;
1106 return 0;
1107}
1108
1109
1110/** Get a notification that a vnode changed
1111 *
1112 * @param[in] el the event list.
1113 * @param[in] sockfd the socket which is ready to read.
1114 * @param[in] fflags from kevent.
1115 * @param[in] ctx the network socket context.
1116 */
1117static void fr_network_vnode_extend(UNUSED fr_event_list_t *el, int sockfd, int fflags, void *ctx)
1118{
1119 fr_network_socket_t *s = ctx;
1120 fr_network_t *nr = s->nr;
1121
1122 if (!fr_cond_assert(s->listen->fd == sockfd)) return;
1123
1124 DEBUG3("network vnode");
1125
1126 /*
1127 * Tell the IO handler that something has happened to the
1128 * file.
1129 */
1130 s->listen->app_io->vnode(s->listen, fflags);
1131}
1132
1133
1134/** Handle errors for a socket.
1135 *
1136 * @param[in] el the event list
1137 * @param[in] sockfd the socket which has a fatal error.
1138 * @param[in] flags returned by kevent.
1139 * @param[in] fd_errno returned by kevent.
1140 * @param[in] ctx the network socket context.
1141 */
1143 int fd_errno, void *ctx)
1144{
1145 fr_network_socket_t *s = ctx;
1146 fr_network_t *nr = s->nr;
1147
1148 if (s->listen->app_io->error) {
1149 s->listen->app_io->error(s->listen);
1150
1151 } else if (flags & EV_EOF) {
1152 DEBUG2("Socket %s closed by peer", s->listen->name);
1153
1154 } else {
1155 ERROR("Socket %s errored - %s", s->listen->name, fr_syserror(fd_errno));
1156 }
1157
1159}
1160
1161
1162/** Write packets to the network.
1163 *
1164 * @param el the event list
1165 * @param sockfd the socket which is ready to write
1166 * @param flags returned by kevent.
1167 * @param ctx the network socket context.
1168 */
1169static void fr_network_write(UNUSED fr_event_list_t *el, UNUSED int sockfd, UNUSED int flags, void *ctx)
1170{
1171 fr_network_socket_t *s = ctx;
1172 fr_listen_t *li = s->listen;
1173 fr_network_t *nr = s->nr;
1175
1176 (void) talloc_get_type_abort(nr, fr_network_t);
1177
1178 /*
1179 * Start with the currently pending message, and then
1180 * work through the priority heap.
1181 */
1182 if (s->pending) {
1183 cd = s->pending;
1184 s->pending = NULL;
1185
1186 } else {
1187 cd = fr_heap_pop(&s->waiting);
1188 }
1189
1190 while (cd != NULL) {
1191 int rcode;
1192
1193 fr_assert(li == cd->listen);
1194 if (li->write_hexdump) HEXDUMP2(cd->m.data, cd->m.data_size, "%s writing ", li->name);
1195 rcode = li->app_io->write(li, cd->packet_ctx,
1196 cd->reply.request_time,
1197 cd->m.data, cd->m.data_size, s->written);
1198
1199 /*
1200 * Write of 0 bytes means an OS bug, and we just discard this packet.
1201 */
1202 if (rcode == 0) {
1203 RATE_LIMIT_GLOBAL(ERROR, "Discarding packet due to write returning zero for socket %s",
1204 s->listen->name);
1205 goto discard;
1206 }
1207
1208 /*
1209 * Or we have a write error.
1210 */
1211 if (rcode < 0) {
1212 /*
1213 * Stop processing the heap, and set the
1214 * pending message to the current one.
1215 */
1216 if (errno == EWOULDBLOCK) {
1217 save_pending:
1218 fr_assert(!s->pending);
1219
1220 if (cd->m.status != FR_MESSAGE_LOCALIZED) {
1221 fr_message_t *lm;
1222
1223 lm = fr_message_localize(s, &cd->m, sizeof(*cd));
1224 if (!lm) {
1225 ERROR("Failed saving pending packet");
1226 goto dead;
1227 }
1228
1229 cd = (fr_channel_data_t *) lm;
1230 }
1231
1232 if (!s->blocked) {
1234 PERROR("Failed adding write callback to event loop");
1235 goto dead;
1236 }
1237
1238 s->blocked = true;
1239 }
1240
1241 s->pending = cd;
1242 return;
1243 }
1244
1245 PERROR("Failed writing to socket %s", s->listen->name);
1246
1247 /*
1248 * As a special hack, check for something
1249 * that will never be returned from a
1250 * real write() routine. Which then
1251 * signals to us that we have to close
1252 * the socket, but NOT complain about it.
1253 */
1254 if ((errno == ECONNREFUSED) || (errno == ECONNRESET)) goto dead;
1255
1256 if (li->app_io->error) li->app_io->error(li);
1257
1258 dead:
1259 fr_message_done(&cd->m);
1261 return;
1262 }
1263
1264 /*
1265 * If we've done a partial write, localize the message and continue.
1266 */
1267 if ((size_t) rcode < cd->m.data_size) {
1268 s->written = rcode;
1269 goto save_pending;
1270 }
1271
1272 discard:
1273 s->written = 0;
1274
1275 /*
1276 * Reset for the next message.
1277 */
1278 fr_message_done(&cd->m);
1279 nr->stats.out++;
1280 s->stats.out++;
1281
1282 /*
1283 * Grab the net entry.
1284 */
1285 cd = fr_heap_pop(&s->waiting);
1286 }
1287
1288 /*
1289 * We've successfully written all of the packets. Remove
1290 * the write callback.
1291 */
1293 PERROR("Failed removing write callback from event loop");
1295 }
1296
1297 s->blocked = false;
1298}
1299
1301{
1302 fr_network_t *nr = s->nr;
1304
1305 fr_assert(s->outstanding == 0);
1306
1307 fr_rb_delete(nr->sockets, s);
1309
1310 if (!s->dead) fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
1311
1312 if (s->listen->app_io->close) {
1313 s->listen->app_io->close(s->listen);
1314 } else {
1315 close(s->listen->fd);
1316 }
1317
1318 if (s->pending) {
1320 s->pending = NULL;
1321 }
1322
1323 /*
1324 * Clean up any queued entries.
1325 */
1326 while ((cd = fr_heap_pop(&s->waiting)) != NULL) {
1327 fr_message_done(&cd->m);
1328 }
1329
1330 /* s->waiting is already talloc parented from s */
1331 talloc_free(s->listen);
1332
1333 return 0;
1334}
1335
1336
1337/** Handle a network control message callback for a new listener
1338 *
1339 * @param[in] ctx the network
1340 * @param[in] data the message
1341 * @param[in] data_size size of the data
1342 * @param[in] now the current time
1343 */
1344static void fr_network_listen_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1345{
1346 fr_network_t *nr = talloc_get_type_abort(ctx, fr_network_t);
1347 fr_listen_t *li;
1348
1349 fr_assert(data_size == sizeof(li));
1350
1351 if (data_size != sizeof(li)) return;
1352
1353 li = talloc_get_type_abort(*((void * const *)data), fr_listen_t);
1354
1355 (void) fr_network_listen_add_self(nr, li);
1356}
1357
1359{
1361 fr_app_io_t const *app_io;
1362 size_t size;
1363 int num_messages;
1364
1365 fr_assert(li->app_io != NULL);
1366
1367 /*
1368 * Non-socket listeners just get told about the event
1369 * list, and nothing else.
1370 */
1371 if (li->non_socket_listener) {
1372 fr_assert(li->app_io->event_list_set != NULL);
1373 fr_assert(!li->app_io->read);
1374 fr_assert(!li->app_io->write);
1375
1376 li->app_io->event_list_set(li, nr->el, nr);
1377
1378 /*
1379 * We use fr_log() here to avoid the "Network - " prefix.
1380 */
1381 fr_log(nr->log, L_DBG, __FILE__, __LINE__, "Listener %s bound to virtual server %s",
1382 li->name, cf_section_name2(li->server_cs));
1383
1384 return 0;
1385 }
1386
1387 s = talloc_zero(nr, fr_network_socket_t);
1388 fr_assert(s != NULL);
1389 talloc_steal(s, li);
1390
1391 s->nr = nr;
1392 s->listen = li;
1393 s->number = nr->num_sockets++;
1394
1395 MEM(s->waiting = fr_heap_alloc(s, waiting_cmp, fr_channel_data_t, channel.heap_id, 0));
1396
1397 talloc_set_destructor(s, _network_socket_free);
1398
1399 /*
1400 * Put reasonable limits on the ring buffer size. Then
1401 * round it up to the nearest power of 2, which is
1402 * required by the ring buffer code.
1403 */
1404 num_messages = s->listen->num_messages;
1405 if (num_messages < 8) num_messages = 8;
1406
1407 size = s->listen->default_message_size * num_messages;
1408 if (size < (1 << 17)) size = (1 << 17);
1409 if (size > (100 * 1024 * 1024)) size = (100 * 1024 * 1024);
1410
1411 /*
1412 * Allocate the ring buffer for messages and packets.
1413 */
1414 s->ms = fr_message_set_create(s, num_messages,
1415 sizeof(fr_channel_data_t),
1416 size, false);
1417 if (!s->ms) {
1418 PERROR("Failed creating message buffers for network IO");
1419 talloc_free(s);
1420 return -1;
1421 }
1422
1423 app_io = s->listen->app_io;
1425
1426 if (fr_event_fd_insert(nr, NULL, nr->el, s->listen->fd,
1430 s) < 0) {
1431 PERROR("Failed adding new socket to network event loop");
1432 talloc_free(s);
1433 return -1;
1434 }
1435
1436 /*
1437 * Start of with write updates being paused. We don't
1438 * care about being able to write if there's nothing to
1439 * write.
1440 */
1442
1443 /*
1444 * Add the listener before calling the app_io, so that
1445 * the app_io can find the listener which we're adding
1446 * here.
1447 */
1448 (void) fr_rb_insert(nr->sockets, s);
1449 (void) fr_rb_insert(nr->sockets_by_num, s);
1450
1451 if (app_io->event_list_set) app_io->event_list_set(s->listen, nr->el, nr);
1452
1453 /*
1454 * We use fr_log() here to avoid the "Network - " prefix.
1455 */
1456 fr_log(nr->log, L_DBG, __FILE__, __LINE__, "Listening on %s bound to virtual server %s",
1458
1459 DEBUG3("Using new socket %s with FD %d", s->listen->name, s->listen->fd);
1460
1461 return 0;
1462}
1463
1464/** Handle a network control message callback for a new "watch directory"
1465 *
1466 * @param[in] ctx the network
1467 * @param[in] data the message
1468 * @param[in] data_size size of the data
1469 * @param[in] now the current time
1470 */
1471static void fr_network_directory_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1472{
1473 int num_messages;
1474 fr_network_t *nr = talloc_get_type_abort(ctx, fr_network_t);
1475 fr_listen_t *li = talloc_get_type_abort(*((void * const *)data), fr_listen_t);
1477 fr_app_io_t const *app_io;
1479
1480 if (!fr_cond_assert(data_size == sizeof(li))) return;
1481
1482 memcpy(&li, data, sizeof(li));
1483
1484 s = talloc_zero(nr, fr_network_socket_t);
1485 fr_assert(s != NULL);
1486 talloc_steal(s, li);
1487
1488 s->nr = nr;
1489 s->listen = li;
1490 s->number = nr->num_sockets++;
1491
1492 MEM(s->waiting = fr_heap_alloc(s, waiting_cmp, fr_channel_data_t, channel.heap_id, 0));
1493
1494 talloc_set_destructor(s, _network_socket_free);
1495
1496 /*
1497 * Allocate the ring buffer for messages and packets.
1498 */
1499 num_messages = s->listen->num_messages;
1500 if (num_messages < 8) num_messages = 8;
1501
1502 s->ms = fr_message_set_create(s, num_messages,
1503 sizeof(fr_channel_data_t),
1505 if (!s->ms) {
1506 PERROR("Failed creating message buffers for directory IO");
1507 talloc_free(s);
1508 return;
1509 }
1510
1511 app_io = s->listen->app_io;
1512
1513 if (app_io->event_list_set) app_io->event_list_set(s->listen, nr->el, nr);
1514
1516
1517 if (fr_event_filter_insert(nr, NULL, nr->el, s->listen->fd, s->filter,
1518 &funcs,
1519 app_io->error ? fr_network_error : NULL,
1520 s) < 0) {
1521 PERROR("Failed adding directory monitor event loop");
1522 talloc_free(s);
1523 return;
1524 }
1525
1526 (void) fr_rb_insert(nr->sockets, s);
1527 (void) fr_rb_insert(nr->sockets_by_num, s);
1528
1529 DEBUG3("Using new socket with FD %d", s->listen->fd);
1530}
1531
1532/** Handle a network control message callback for a new worker
1533 *
1534 * @param[in] ctx the network
1535 * @param[in] data the message
1536 * @param[in] data_size size of the data
1537 * @param[in] now the current time
1538 */
1539static void fr_network_worker_started_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1540{
1541 int i;
1542 fr_network_t *nr = ctx;
1543 fr_worker_t *worker;
1545
1546 fr_assert(data_size == sizeof(worker));
1547
1548 if (nr->num_workers >= nr->max_workers) {
1549 ERROR("Too many workers");
1550 return;
1551 }
1552
1553 memcpy(&worker, data, data_size);
1554 (void) talloc_get_type_abort(worker, fr_worker_t);
1555
1556 MEM(w = talloc_zero(nr, fr_network_worker_t));
1557
1558 w->worker = worker;
1559 w->channel = fr_worker_channel_create(worker, w, nr->control);
1561 fr_fatal_assert_msg(w->channel, "Failed creating new channel");
1562
1565
1566 /*
1567 * Insert the worker into the array of workers.
1568 */
1569 for (i = 0; i < nr->max_workers; i++) {
1570 if (nr->workers[i]) continue;
1571
1572 nr->workers[i] = w;
1573 nr->num_workers++;
1574 return;
1575 }
1576}
1577
1578/** Handle a network control message callback for a packet sent to a socket
1579 *
1580 * @param[in] ctx the network
1581 * @param[in] data the message
1582 * @param[in] data_size size of the data
1583 * @param[in] now the current time
1584 */
1585static void fr_network_inject_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1586{
1587 fr_network_t *nr = ctx;
1588 fr_network_inject_t my_inject;
1590
1591 fr_assert(data_size == sizeof(my_inject));
1592
1593 memcpy(&my_inject, data, data_size);
1594 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = my_inject.listen });
1595 if (!s) {
1596 talloc_free(my_inject.packet); /* MUST be it's own TALLOC_CTX */
1597 return;
1598 }
1599
1600 /*
1601 * Inject the packet, and then read it back from the
1602 * network.
1603 */
1604 if (s->listen->app_io->inject(s->listen, my_inject.packet, my_inject.packet_len, my_inject.recv_time) == 0) {
1605 fr_network_read(nr->el, s->listen->fd, 0, s);
1606 }
1607
1608 talloc_free(my_inject.packet);
1609}
1610
1611/** Run the event loop 'pre' callback
1612 *
1613 * This function MUST DO NO WORK. All it does is check if there's
1614 * work, and tell the event code to return to the main loop if
1615 * there's work to do.
1616 *
1617 * @param[in] now the current time.
1618 * @param[in] wake the time when the event loop will wake up.
1619 * @param[in] uctx the network
1620 */
1622{
1623 fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1624
1625 if (fr_heap_num_elements(nr->replies) > 0) return 1;
1626
1627 return 0;
1628}
1629
1630/** Handle replies after all FD and timer events have been serviced
1631 *
1632 * @param el the event loop
1633 * @param now the current time (mostly)
1634 * @param uctx the fr_network_t
1635 */
1637{
1639 fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1640
1641 /*
1642 * Pull the replies off of our global heap, and try to
1643 * push them to the individual sockets.
1644 */
1645 while ((cd = fr_heap_pop(&nr->replies)) != NULL) {
1646 fr_listen_t *li;
1648
1649 li = cd->listen;
1650
1651 /*
1652 * @todo - cache this somewhere so we don't need
1653 * to do an rbtree lookup for every packet.
1654 */
1655 s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
1656
1657 /*
1658 * This shouldn't happen, but be safe...
1659 */
1660 if (!s) {
1661 fr_message_done(&cd->m);
1662 continue;
1663 }
1664
1665 if (cd->m.status != FR_MESSAGE_LOCALIZED) {
1666 fr_assert(s->outstanding > 0);
1667 s->outstanding--;
1668 }
1669
1670 /*
1671 * Just mark the message done, and skip it.
1672 */
1673 if (s->dead) {
1674 fr_message_done(&cd->m);
1675
1676 /*
1677 * No more packets, it's safe to delete
1678 * the socket.
1679 */
1680 if (!s->outstanding) talloc_free(s);
1681
1682 continue;
1683 }
1684
1685 /*
1686 * No data to write to the socket, so we skip the message.
1687 */
1688 if (!cd->m.data_size) {
1689 fr_message_done(&cd->m);
1690 continue;
1691 }
1692
1693 (void) fr_heap_insert(&s->waiting, cd);
1694
1695 /*
1696 * No pending message, write it. If there is a pending write, the message will be left
1697 * in the waiting queue.
1698 */
1699 if (!s->pending) {
1700 fr_assert(!s->blocked);
1701 fr_network_write(nr->el, s->listen->fd, 0, s);
1702 }
1703 }
1704}
1705
1706/** Stop a network thread in an orderly way
1707 *
1708 * @param[in] nr the network to stop
1709 */
1711{
1713
1714 (void) talloc_get_type_abort(nr, fr_network_t);
1715
1716 /*
1717 * Close the network sockets
1718 */
1719 {
1720 fr_network_socket_t **sockets;
1721 size_t len;
1722 size_t i;
1723
1724 if (fr_rb_flatten_inorder(nr, (void ***)&sockets, nr->sockets) < 0) return -1;
1725 len = talloc_array_length(sockets);
1726
1727 for (i = 0; i < len; i++) {
1728 /*
1729 * Force to zero so we don't trigger asserts
1730 * if packets are being processed and the
1731 * server exits.
1732 */
1733 sockets[i]->outstanding = 0;
1734 talloc_free(sockets[i]);
1735 }
1736
1737 talloc_free(sockets);
1738 }
1739
1740
1741 /*
1742 * Clean up all outstanding replies.
1743 *
1744 * We can't do this after signalling the
1745 * workers to close, because they free
1746 * their message sets, and we end up
1747 * getting random use-after-free errors
1748 * as there's a race between the network
1749 * popping replies, and the workers
1750 * freeing their message sets.
1751 *
1752 * This isn't perfect, and we might still
1753 * lose some replies, but it's good enough
1754 * for now.
1755 *
1756 * @todo - call transport "done" for the reply, so that
1757 * it knows the replies are done, too.
1758 */
1759 while ((cd = fr_heap_pop(&nr->replies)) != NULL) {
1760 fr_message_done(&cd->m);
1761 }
1762
1763 /*
1764 * Signal the workers that we're closing
1765 *
1766 * nr->num_workers is decremented every
1767 * time a worker closes a socket.
1768 *
1769 * When nr->num_workers == 0, the event
1770 * loop (fr_network()) will exit.
1771 */
1772 {
1773 int i;
1774
1775 for (i = 0; i < nr->num_workers; i++) {
1776 fr_network_worker_t *worker = nr->workers[i];
1777
1779 }
1780 }
1781
1784 nr->exiting = true;
1786
1787 return 0;
1788}
1789
1790/** Read handler for signal pipe
1791 *
1792 */
1793static void _signal_pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
1794{
1795 fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1796 uint8_t buff;
1797
1798 if (read(fd, &buff, sizeof(buff)) < 0) {
1799 ERROR("Failed reading signal - %s", fr_syserror(errno));
1800 return;
1801 }
1802
1803 fr_assert(buff == 1);
1804
1805 /*
1806 * fr_network_stop() will signal the workers
1807 * to exit (by closing their channels).
1808 *
1809 * When we get the ack, we decrement our
1810 * nr->num_workers counter.
1811 *
1812 * When the counter reaches 0, the event loop
1813 * exits.
1814 */
1815 DEBUG2("Signalled to exit");
1816
1817 if (unlikely(fr_network_destroy(nr) < 0)) {
1818 PERROR("Failed stopping network");
1819 }
1820}
1821
1822/** The main network worker function.
1823 *
1824 * @param[in] nr the network data structure to run.
1825 */
1827{
1828 /*
1829 * Run until we're told to exit AND the number of
1830 * workers has dropped to zero.
1831 *
1832 * This is important as if we exit too early we
1833 * free the channels out from underneath the
1834 * workers and they read uninitialised memory.
1835 *
1836 * Whenever a worker ACKs our close notification
1837 * nr->num_workers is decremented, so when
1838 * nr->num_workers == 0, all workers have ACKd
1839 * our close and are no longer using the channel.
1840 */
1841 while (likely(!(nr->exiting && (nr->num_workers == 0)))) {
1842 bool wait_for_event;
1843 int num_events;
1844
1845 /*
1846 * There are runnable requests. We still service
1847 * the event loop, but we don't wait for events.
1848 */
1849 wait_for_event = (fr_heap_num_elements(nr->replies) == 0);
1850
1851 /*
1852 * Check the event list. If there's an error
1853 * (e.g. exit), we stop looping and clean up.
1854 */
1855 DEBUG4("Gathering events - %s", wait_for_event ? "will wait" : "Will not wait");
1856 num_events = fr_event_corral(nr->el, fr_time(), wait_for_event);
1857 DEBUG4("%u event(s) pending%s",
1858 num_events == -1 ? 0 : num_events, num_events == -1 ? " - event loop exiting" : "");
1859 if (num_events < 0) break;
1860
1861 /*
1862 * Service outstanding events.
1863 */
1864 if (num_events > 0) {
1865 DEBUG4("Servicing event(s)");
1866 fr_event_service(nr->el);
1867 }
1868 }
1869 return;
1870}
1871
1872/** Signal a network thread to exit
1873 *
1874 * @note Request to exit will be processed asynchronously.
1875 *
1876 * @param[in] nr the network data structure to manage
1877 * @return
1878 * - 0 on success.
1879 * - -1 on failure.
1880 */
1882{
1883 if (write(nr->signal_pipe[1], &(uint8_t){ 0x01 }, 1) < 0) {
1884 fr_strerror_printf("Failed signalling network thread to exit - %s", fr_syserror(errno));
1885 return -1;
1886 }
1887
1888 return 0;
1889}
1890
1891/** Free any resources associated with a network thread
1892 *
1893 */
1895{
1896 if (nr->signal_pipe[0] >= 0) close(nr->signal_pipe[0]);
1897 if (nr->signal_pipe[1] >= 0) close(nr->signal_pipe[1]);
1898
1899 return 0;
1900}
1901
1902/** Create a network
1903 *
1904 * @param[in] ctx The talloc ctx
1905 * @param[in] el The event list
1906 * @param[in] name Networker identifier.
1907 * @param[in] logger The destination for all logging messages
1908 * @param[in] lvl Log level
1909 * @param[in] config configuration structure.
1910 * @return
1911 * - NULL on error
1912 * - fr_network_t on success
1913 */
1914fr_network_t *fr_network_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name,
1915 fr_log_t const *logger, fr_log_lvl_t lvl,
1917{
1918 fr_network_t *nr;
1919
1920 nr = talloc_zero(ctx, fr_network_t);
1921 if (!nr) {
1922 fr_strerror_const("Failed allocating memory");
1923 return NULL;
1924 }
1925 talloc_set_destructor(nr, _fr_network_free);
1926
1927 nr->name = talloc_strdup(nr, name);
1928
1929 nr->thread_id = pthread_self();
1930 nr->el = el;
1931 nr->log = logger;
1932 nr->lvl = lvl;
1933
1935 nr->num_workers = 0;
1936 nr->signal_pipe[0] = -1;
1937 nr->signal_pipe[1] = -1;
1938 if (config) nr->config = *config;
1939
1940 nr->aq_control = fr_atomic_queue_alloc(nr, 1024);
1941 if (!nr->aq_control) {
1942 talloc_free(nr);
1943 return NULL;
1944 }
1945
1946 nr->control = fr_control_create(nr, el, nr->aq_control);
1947 if (!nr->control) {
1948 fr_strerror_const_push("Failed creating control queue");
1949 fail:
1950 talloc_free(nr);
1951 return NULL;
1952 }
1953
1954 /*
1955 * @todo - rely on thread-local variables. And then the
1956 * various users of this can check if (rb == nr->rb), and
1957 * if so, skip the whole control plane / kevent /
1958 * whatever roundabout thing.
1959 */
1961 if (!nr->rb) {
1962 fr_strerror_const_push("Failed creating ring buffer");
1963 fail2:
1964 talloc_free(nr->control);
1965 goto fail;
1966 }
1967
1969 fr_strerror_const_push("Failed adding channel callback");
1970 goto fail2;
1971 }
1972
1974 fr_strerror_const_push("Failed adding socket callback");
1975 goto fail2;
1976 }
1977
1979 fr_strerror_const_push("Failed adding socket callback");
1980 goto fail2;
1981 }
1982
1984 fr_strerror_const_push("Failed adding worker callback");
1985 goto fail2;
1986 }
1987
1989 fr_strerror_const_push("Failed adding packet injection callback");
1990 goto fail2;
1991 }
1992
1993 /*
1994 * Create the various heaps.
1995 */
1997 if (!nr->sockets) {
1998 fr_strerror_const_push("Failed creating listen tree for sockets");
1999 goto fail2;
2000 }
2001
2003 if (!nr->sockets_by_num) {
2004 fr_strerror_const_push("Failed creating number tree for sockets");
2005 goto fail2;
2006 }
2007
2008 nr->replies = fr_heap_alloc(nr, reply_cmp, fr_channel_data_t, channel.heap_id, 0);
2009 if (!nr->replies) {
2010 fr_strerror_const_push("Failed creating heap for replies");
2011 goto fail2;
2012 }
2013
2014 if (fr_event_pre_insert(nr->el, fr_network_pre_event, nr) < 0) {
2015 fr_strerror_const("Failed adding pre-check to event list");
2016 goto fail2;
2017 }
2018
2019 if (fr_event_post_insert(nr->el, fr_network_post_event, nr) < 0) {
2020 fr_strerror_const("Failed inserting post-processing event");
2021 goto fail2;
2022 }
2023
2024 if (pipe(nr->signal_pipe) < 0) {
2025 fr_strerror_printf("Failed initialising signal pipe - %s", fr_syserror(errno));
2026 goto fail2;
2027 }
2028 if (fr_nonblock(nr->signal_pipe[0]) < 0) goto fail2;
2029 if (fr_nonblock(nr->signal_pipe[1]) < 0) goto fail2;
2030
2031 if (fr_event_fd_insert(nr, NULL, nr->el, nr->signal_pipe[0], _signal_pipe_read, NULL, NULL, nr) < 0) {
2032 fr_strerror_const("Failed inserting event for signal pipe");
2033 goto fail2;
2034 }
2035
2036 return nr;
2037}
2038
2039int fr_network_stats(fr_network_t const *nr, int num, uint64_t *stats)
2040{
2041 if (num < 0) return -1;
2042 if (num == 0) return 0;
2043
2044 stats[0] = nr->stats.in;
2045 if (num >= 2) stats[1] = nr->stats.out;
2046 if (num >= 3) stats[2] = nr->stats.dup;
2047 if (num >= 4) stats[3] = nr->stats.dropped;
2048 if (num >= 5) stats[4] = nr->num_workers;
2049
2050 if (num <= 5) return num;
2051
2052 return 5;
2053}
2054
2055void fr_network_stats_log(fr_network_t const *nr, fr_log_t const *log)
2056{
2057 int i;
2058
2059 /*
2060 * Dump all of the channel statistics.
2061 */
2062 for (i = 0; i < nr->max_workers; i++) {
2063 if (!nr->workers[i]) continue;
2064
2065 fr_channel_stats_log(nr->workers[i]->channel, log, __FILE__, __LINE__);
2066 }
2067}
2068
2069static int cmd_stats_self(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
2070{
2071 fr_network_t const *nr = ctx;
2072
2073 fprintf(fp, "count.in\t%" PRIu64 "\n", nr->stats.in);
2074 fprintf(fp, "count.out\t%" PRIu64 "\n", nr->stats.out);
2075 fprintf(fp, "count.dup\t%" PRIu64 "\n", nr->stats.dup);
2076 fprintf(fp, "count.dropped\t%" PRIu64 "\n", nr->stats.dropped);
2077 fprintf(fp, "count.sockets\t%u\n", fr_rb_num_elements(nr->sockets));
2078
2079 return 0;
2080}
2081
2082static int cmd_socket_list(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
2083{
2084 fr_network_t const *nr = ctx;
2087
2088 // @todo - note that this isn't thread-safe!
2089
2090 for (s = fr_rb_iter_init_inorder(nr->sockets, &iter);
2091 s != NULL;
2092 s = fr_rb_iter_next_inorder(nr->sockets, &iter)) {
2093 if (!s->listen->app_io->get_name) {
2094 fprintf(fp, "%s\n", s->listen->app_io->common.name);
2095 } else {
2096 fprintf(fp, "%d\t%s\n", s->number, s->listen->app_io->get_name(s->listen));
2097 }
2098 }
2099 return 0;
2100}
2101
2102static int cmd_stats_socket(FILE *fp, FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
2103{
2104 fr_network_t const *nr = ctx;
2106
2107 s = fr_rb_find(nr->sockets_by_num, &(fr_network_socket_t){ .number = info->box[0]->vb_uint32 });
2108 if (!s) {
2109 fprintf(fp_err, "No such socket number '%s'.\n", info->argv[0]);
2110 return -1;
2111 }
2112
2113 fprintf(fp, "count.in\t%" PRIu64 "\n", s->stats.in);
2114 fprintf(fp, "count.out\t%" PRIu64 "\n", s->stats.out);
2115 fprintf(fp, "count.dup\t%" PRIu64 "\n", s->stats.dup);
2116 fprintf(fp, "count.dropped\t%" PRIu64 "\n", s->stats.dropped);
2117
2118 return 0;
2119}
2120
2121
2123 {
2124 .parent = "stats",
2125 .name = "network",
2126 .help = "Statistics for network threads.",
2127 .read_only = true
2128 },
2129
2130 {
2131 .parent = "stats network",
2132 .add_name = true,
2133 .name = "self",
2134 .func = cmd_stats_self,
2135 .help = "Show statistics for a specific network thread.",
2136 .read_only = true
2137 },
2138
2139 {
2140 .parent = "stats network",
2141 .add_name = true,
2142 .name = "socket",
2143 .syntax = "INTEGER",
2144 .func = cmd_stats_socket,
2145 .help = "Show statistics for a specific socket",
2146 .read_only = true
2147 },
2148
2149 {
2150 .parent = "show",
2151 .name = "network",
2152 .help = "Show information about network threads.",
2153 .read_only = true
2154 },
2155
2156 {
2157 .parent = "show network",
2158 .add_name = true,
2159 .name = "socket",
2160 .syntax = "list",
2161 .func = cmd_socket_list,
2162 .help = "List the sockets associated with this network thread.",
2163 .read_only = true
2164 },
2165
2167};
static int const char char buffer[256]
Definition acutest.h:578
fr_io_close_t close
Close the transport.
Definition app_io.h:60
fr_io_data_read_t read
Read from a socket to a data buffer.
Definition app_io.h:47
module_t common
Common fields to all loadable modules.
Definition app_io.h:34
fr_io_signal_t error
There was an error on the socket.
Definition app_io.h:59
fr_app_event_list_set_t event_list_set
Called by the network thread to pass an event list for use by the app_io_t.
Definition app_io.h:36
fr_io_data_inject_t inject
Inject a packet into a socket.
Definition app_io.h:50
fr_io_data_vnode_t vnode
Handle notifications that the VNODE has changed.
Definition app_io.h:52
fr_io_data_write_t write
Write from a data buffer to a socket.
Definition app_io.h:48
fr_io_name_t get_name
get the socket name
Definition app_io.h:70
Public structure describing an I/O path for a protocol.
Definition app_io.h:33
fr_app_priority_get_t priority
Assign a priority to the packet.
Definition application.h:90
#define fr_atexit_thread_local(_name, _free, _uctx)
Definition atexit.h:221
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:487
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:112
#define unlikely(_x)
Definition build.h:383
#define UNUSED
Definition build.h:317
char const * cf_section_name2(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
Definition cf_util.c:1184
void * fr_channel_requestor_uctx_get(fr_channel_t *ch)
Get network-specific data from a channel.
Definition channel.c:922
fr_table_num_sorted_t const channel_signals[]
Definition channel.c:153
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
Definition channel.c:408
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
Definition channel.c:824
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
Definition channel.c:306
int fr_channel_set_recv_reply(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_reply)
Definition channel.c:930
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition channel.c:685
void fr_channel_requestor_uctx_add(fr_channel_t *ch, void *uctx)
Add network-specific data to a channel.
Definition channel.c:910
void fr_channel_stats_log(fr_channel_t const *ch, fr_log_t const *log, char const *file, int line)
Definition channel.c:963
A full channel, which consists of two ends.
Definition channel.c:144
fr_message_t m
the message header
Definition channel.h:105
fr_channel_event_t
Definition channel.h:67
@ FR_CHANNEL_NOOP
Definition channel.h:74
@ FR_CHANNEL_EMPTY
Definition channel.h:75
@ FR_CHANNEL_CLOSE
Definition channel.h:72
@ FR_CHANNEL_ERROR
Definition channel.h:68
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition channel.h:70
@ FR_CHANNEL_OPEN
Definition channel.h:71
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition channel.h:69
void * packet_ctx
Packet specific context for holding client information, and other proto_* specific information that n...
Definition channel.h:142
fr_listen_t * listen
for tracking packet transport, etc.
Definition channel.h:146
#define PRIORITY_NORMAL
Definition channel.h:151
#define PRIORITY_NOW
Definition channel.h:149
uint32_t priority
Priority of this packet.
Definition channel.h:140
Channel information which is added to a message.
Definition channel.h:104
char const * parent
e.g. "show module"
Definition command.h:52
#define CMD_TABLE_END
Definition command.h:62
char const ** argv
text version of commands
Definition command.h:42
#define FR_CONTROL_ID_INJECT
Definition control.h:60
#define FR_CONTROL_ID_DIRECTORY
Definition control.h:59
#define FR_CONTROL_ID_CHANNEL
Definition control.h:56
#define FR_CONTROL_ID_LISTEN
Definition control.h:57
#define FR_CONTROL_ID_WORKER
Definition control.h:58
#define FR_CONTROL_MAX_SIZE
Definition control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition control.h:50
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:131
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:148
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:176
#define MEM(x)
Definition debug.h:36
#define ERROR(fmt,...)
Definition dhcpclient.c:41
static int sockfd
Definition dhcpclient.c:56
fr_dict_t const * fr_dict_proto_dict(fr_dict_t const *dict)
Definition dict_util.c:5283
#define fr_event_fd_insert(...)
Definition event.h:248
fr_event_filter_t
The type of filter to install for an FD.
Definition event.h:83
@ FR_EVENT_FILTER_VNODE
Filter for vnode subfilters.
Definition event.h:85
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition event.h:84
#define fr_event_filter_update(...)
Definition event.h:240
#define fr_event_filter_insert(...)
Definition event.h:235
#define FR_EVENT_RESUME(_s, _f)
Re-add the filter for a func from kevent.
Definition event.h:132
#define FR_EVENT_SUSPEND(_s, _f)
Temporarily remove the filter for a func from kevent.
Definition event.h:116
fr_event_fd_cb_t extend
Additional files were added to a directory.
Definition event.h:199
Callbacks for the FR_EVENT_FILTER_IO filter.
Definition event.h:189
Structure describing a modification to a filter's state.
Definition event.h:97
Callbacks for the FR_EVENT_FILTER_VNODE filter.
Definition event.h:196
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
Definition heap.c:325
unsigned int fr_heap_index_t
Definition heap.h:80
#define fr_heap_alloc(_ctx, _cmp, _type, _field, _init)
Creates a heap that can be used with non-talloced elements.
Definition heap.h:100
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition heap.h:179
#define FR_HEAP_INDEX_INVALID
Definition heap.h:83
The main heap structure.
Definition heap.h:66
talloc_free(hp)
uint64_t out
Definition base.h:43
uint64_t dup
Definition base.h:44
uint64_t dropped
Definition base.h:45
uint64_t in
Definition base.h:42
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition control.c:428
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:340
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition control.c:149
The control structure.
Definition control.c:79
bool read_hexdump
Do we debug hexdump packets as they're read.
Definition listen.h:53
size_t num_messages
for the message ring buffer
Definition listen.h:57
bool non_socket_listener
special internal listener that does not use sockets.
Definition listen.h:47
char const * name
printable name for this socket - set by open
Definition listen.h:29
void const * app_instance
Definition listen.h:39
size_t default_message_size
copied from app_io, but may be changed
Definition listen.h:56
bool write_hexdump
Do we debug hexdump packets as they're written.
Definition listen.h:54
fr_app_t const * app
Definition listen.h:38
CONF_SECTION * server_cs
CONF_SECTION of the server.
Definition listen.h:42
bool no_write_callback
sometimes we don't need to do writes
Definition listen.h:46
int fd
file descriptor for this socket - set by open
Definition listen.h:28
fr_dict_t const * dict
dictionary for this listener
Definition listen.h:30
bool needs_full_setup
Set to true to avoid the short cut when adding the listener.
Definition listen.h:48
fr_app_io_t const * app_io
I/O path functions.
Definition listen.h:32
fr_ring_buffer_t * rb
ring buffer for my control-plane messages
Definition network.c:126
fr_cmd_table_t cmd_network_table[]
Definition network.c:2122
size_t written
however much we did in a partial write
Definition network.c:92
int fr_network_listen_send_packet(fr_network_t *nr, fr_listen_t *parent, fr_listen_t *li, const uint8_t *buffer, size_t buflen, fr_time_t recv_time, void *packet_ctx)
Send a packet to the worker.
Definition network.c:799
fr_atomic_queue_t * aq_control
atomic queue for control messages sent to me
Definition network.c:122
static int cmd_stats_socket(FILE *fp, FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
Definition network.c:2102
int fr_network_listen_add(fr_network_t *nr, fr_listen_t *li)
Add a fr_listen_t to a network.
Definition network.c:236
bool suspended
whether or not we're suspended.
Definition network.c:117
int fr_network_worker_add(fr_network_t *nr, fr_worker_t *worker)
Add a worker to a network in a different thread.
Definition network.c:305
int fr_network_destroy(fr_network_t *nr)
Stop a network thread in an orderly way.
Definition network.c:1710
fr_network_t * nr
O(N) issues in talloc.
Definition network.c:77
fr_io_stats_t stats
Definition network.c:70
fr_listen_t * listen
Definition network.c:52
static void fr_network_listen_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a new listener.
Definition network.c:1344
uint8_t * packet
Definition network.c:53
static int cmd_stats_self(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
Definition network.c:2069
fr_log_t const * log
log destination
Definition network.c:119
int fr_network_directory_add(fr_network_t *nr, fr_listen_t *li)
Add a "watch directory" call to a network.
Definition network.c:290
static int _fr_network_free(fr_network_t *nr)
Free any resources associated with a network thread.
Definition network.c:1894
static void fr_network_inject_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a packet sent to a socket.
Definition network.c:1585
fr_heap_index_t heap_id
for the sockets_by_num heap
Definition network.c:79
#define RTT(_old, _new)
Definition network.c:508
void fr_network(fr_network_t *nr)
The main network worker function.
Definition network.c:1826
fr_message_set_t * ms
message buffers for this socket.
Definition network.c:89
int fr_network_listen_delete(fr_network_t *nr, fr_listen_t *li)
Delete a socket from a network.
Definition network.c:271
int num_blocked
number of blocked workers
Definition network.c:138
static void fr_network_worker_started_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
ssize_t fr_network_listen_outstanding(fr_network_t *nr, fr_listen_t *li)
Get the number of outstanding packets.
Definition network.c:841
char const * name
Network ID for logging.
Definition network.c:113
void fr_network_worker_add_self(fr_network_t *nr, fr_worker_t *worker)
Add a worker to a network in the same thread.
Definition network.c:325
unsigned int outstanding
number of outstanding packets sent to the worker
Definition network.c:86
static _Thread_local fr_ring_buffer_t * fr_network_rb
Definition network.c:49
int number
unique ID
Definition network.c:78
static fr_event_update_t const resume_write[]
Definition network.c:472
fr_time_delta_t predicted
predicted processing time for one packet
Definition network.c:64
static int fr_network_pre_event(fr_time_t now, fr_time_delta_t wake, void *uctx)
static fr_event_update_t const pause_read[]
Definition network.c:457
fr_worker_t * worker
worker pointer
Definition network.c:69
int fr_network_sendto_worker(fr_network_t *nr, fr_listen_t *li, void *packet_ctx, uint8_t const *data, size_t data_len, fr_time_t recv_time)
Definition network.c:1073
int fr_network_exit(fr_network_t *nr)
Signal a network thread to exit.
Definition network.c:1881
#define MAX_WORKERS
Definition network.c:47
int fr_network_listen_inject(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, fr_time_t recv_time)
Inject a packet for a listener to read.
Definition network.c:410
fr_listen_t * listen
I/O ctx and functions.
Definition network.c:87
int num_sockets
actually a counter...
Definition network.c:141
fr_rb_node_t listen_node
rbtree node for looking up by listener.
Definition network.c:74
static void fr_network_vnode_extend(UNUSED fr_event_list_t *el, int sockfd, int fflags, void *ctx)
Get a notification that a vnode changed.
Definition network.c:1117
static void _signal_pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Read handler for signal pipe.
Definition network.c:1793
#define OUTSTANDING(_x)
Definition network.c:625
static int8_t reply_cmp(void const *one, void const *two)
Definition network.c:157
int num_workers
number of active workers
Definition network.c:137
static int8_t socket_num_cmp(void const *one, void const *two)
Definition network.c:186
int num_pending_workers
number of workers we're waiting to start.
Definition network.c:139
fr_rb_tree_t * sockets
list of sockets we're managing, ordered by the listener
Definition network.c:134
pthread_t thread_id
for self
Definition network.c:115
fr_log_lvl_t lvl
debug log level
Definition network.c:120
int signal_pipe[2]
Pipe for signalling the worker in an orderly way.
Definition network.c:143
fr_channel_data_t * pending
the currently pending partial packet
Definition network.c:94
static int8_t socket_listen_cmp(void const *one, void const *two)
Definition network.c:179
static void fr_network_write(UNUSED fr_event_list_t *el, UNUSED int sockfd, UNUSED int flags, void *ctx)
Write packets to the network.
Definition network.c:1169
fr_event_list_t * el
our event list
Definition network.c:128
fr_heap_t * replies
replies from the worker, ordered by priority / origin time
Definition network.c:130
static int fr_network_send_request(fr_network_t *nr, fr_channel_data_t *cd)
Send a message on the "best" channel.
Definition network.c:632
static fr_event_update_t const resume_read[]
Definition network.c:462
void fr_network_stats_log(fr_network_t const *nr, fr_log_t const *log)
Definition network.c:2055
fr_heap_t * waiting
packets waiting to be written
Definition network.c:95
int fr_network_stats(fr_network_t const *nr, int num, uint64_t *stats)
Definition network.c:2039
fr_heap_index_t heap_id
workers are in a heap
Definition network.c:62
bool blocked
is this worker blocked?
Definition network.c:66
static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx)
Read a packet from the network.
Definition network.c:900
static bool is_network_thread(fr_network_t const *nr)
Definition network.c:224
fr_rb_node_t num_node
rbtree node for looking up by number.
Definition network.c:75
static void fr_network_error(UNUSED fr_event_list_t *el, UNUSED int sockfd, int flags, int fd_errno, void *ctx)
Handle errors for a socket.
Definition network.c:1142
fr_io_stats_t stats
Definition network.c:96
static fr_ring_buffer_t * fr_network_rb_init(void)
Initialise thread local storage.
Definition network.c:206
fr_channel_data_t * cd
cached in case of allocation & read error
Definition network.c:90
static int fr_network_listen_add_self(fr_network_t *nr, fr_listen_t *listen)
Definition network.c:1358
static void fr_network_suspend(fr_network_t *nr)
Definition network.c:477
bool dead
is it dead?
Definition network.c:83
size_t leftover
leftover data from a previous read
Definition network.c:91
static void fr_network_post_event(fr_event_list_t *el, fr_time_t now, void *uctx)
fr_network_worker_t * workers[MAX_WORKERS]
each worker
Definition network.c:149
fr_time_t recv_time
Definition network.c:55
static void fr_network_unsuspend(fr_network_t *nr)
Definition network.c:492
fr_rb_tree_t * sockets_by_num
ordered by number;
Definition network.c:135
fr_network_config_t config
configuration
Definition network.c:148
void fr_network_listen_read(fr_network_t *nr, fr_listen_t *li)
Signal the network to read from a listener.
Definition network.c:336
static int8_t waiting_cmp(void const *one, void const *two)
Definition network.c:168
fr_io_stats_t stats
Definition network.c:132
static int _fr_network_rb_free(void *arg)
Definition network.c:197
static void fr_network_recv_reply(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Callback which handles a message being received on the network side.
Definition network.c:516
int max_workers
maximum number of allowed workers
Definition network.c:140
void fr_network_listen_write(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, void *packet_ctx, fr_time_t request_time)
Inject a packet for a listener to write.
Definition network.c:362
bool exiting
are we exiting?
Definition network.c:146
fr_event_filter_t filter
what type of filter it is
Definition network.c:81
static void fr_network_socket_dead(fr_network_t *nr, fr_network_socket_t *s)
Definition network.c:857
static void fr_network_directory_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a new "watch directory".
Definition network.c:1471
fr_channel_t * channel
channel to the worker
Definition network.c:68
static fr_event_update_t const pause_write[]
Definition network.c:467
fr_network_t * fr_network_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_network_config_t const *config)
Create a network.
Definition network.c:1914
static int _network_socket_free(fr_network_socket_t *s)
Definition network.c:1300
static void fr_network_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
Handle a network control message callback for a channel.
Definition network.c:564
static int cmd_socket_list(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
Definition network.c:2082
fr_time_delta_t cpu_time
how much CPU time this worker has spent
Definition network.c:63
fr_control_t * control
the control plane
Definition network.c:124
bool blocked
is it blocked?
Definition network.c:84
Associate a worker thread with a network thread.
Definition network.c:61
uint32_t max_outstanding
Definition network.h:46
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define DEBUG4(_fmt,...)
Definition log.h:267
#define HEXDUMP2(_data, _len, _fmt,...)
Definition log.h:734
#define RATE_LIMIT_GLOBAL(_log, _fmt,...)
Rate limit messages using a global limiting entry.
Definition log.h:653
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition event.c:2193
int fr_event_pre_delete(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Delete a pre-event callback from the event list.
Definition event.c:1975
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition event.c:2058
int fr_event_post_delete(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Delete a post-event callback from the event list.
Definition event.c:2029
int fr_event_pre_insert(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Add a pre-event callback to the event list.
Definition event.c:1953
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition event.c:1203
int fr_event_post_insert(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Add a post-event callback to the event list.
Definition event.c:2007
Stores all information relating to an event list.
Definition event.c:377
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition log.c:577
fr_log_lvl_t
Definition log.h:67
@ L_DBG
Only displayed when debugging is enabled.
Definition log.h:59
unsigned int uint32_t
long int ssize_t
unsigned char uint8_t
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
Create a message set.
Definition message.c:128
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:196
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:1001
fr_message_t * fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
Localize a message by copying it to local storage.
Definition message.c:248
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition message.c:947
fr_message_t * fr_message_alloc_reserve(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size, size_t leftover, size_t reserve_size)
Allocate packet data for a message, and reserve a new message.
Definition message.c:1090
A Message set, composed of message headers and ring buffer data.
Definition message.c:95
size_t rb_size
cache-aligned size in the ring buffer
Definition message.h:51
fr_time_t when
when this message was sent
Definition message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
@ FR_MESSAGE_USED
Definition message.h:39
@ FR_MESSAGE_LOCALIZED
Definition message.h:40
fr_message_status_t status
free, used, done, etc.
Definition message.h:45
int fr_nonblock(UNUSED int fd)
Definition misc.c:294
static const conf_parser_t config[]
Definition base.c:169
#define fr_assert(_expr)
Definition rad_assert.h:38
#define DEBUG2(fmt,...)
static fr_app_io_t app_io
uint32_t fr_rand(void)
Return a 32-bit random number.
Definition rand.c:105
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
Return how many nodes there are in a tree.
Definition rb.c:781
void * fr_rb_iter_init_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Initialise an in-order iterator.
Definition rb.c:824
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
bool fr_rb_delete(fr_rb_tree_t *tree, void const *data)
Remove node and free data (if a free function was specified)
Definition rb.c:741
void * fr_rb_iter_next_inorder(UNUSED fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Return the next node.
Definition rb.c:850
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
Definition rb.h:246
int fr_rb_flatten_inorder(TALLOC_CTX *ctx, void **out[], fr_rb_tree_t *tree)
Iterator structure for in-order traversal of an rbtree.
Definition rb.h:321
The main red black tree structure.
Definition rb.h:73
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:64
static char const * name
static char buff[sizeof("18446744073709551615")+3]
Definition size_tests.c:41
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
Definition log.h:96
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
#define talloc_get_type_abort_const
Definition talloc.h:245
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition time.h:575
static fr_time_delta_t fr_time_delta_add(fr_time_delta_t a, fr_time_delta_t b)
Definition time.h:255
#define fr_time_delta_lt(_a, _b)
Definition time.h:285
#define fr_time_wrap(_time)
Definition time.h:145
#define fr_time_delta_ispos(_a)
Definition time.h:290
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition time.h:916
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
static fr_event_list_t * el
static fr_slen_t parent
Definition pair.h:858
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
Definition strerror.c:732
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1334
fr_dict_t const * virtual_server_dict_by_cs(CONF_SECTION const *server_cs)
Return the namespace for the virtual server specified by a config section.
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
Definition worker.c:1624
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
Definition worker.c:1651
A worker which takes packets from a master, and processes them.
Definition worker.c:99