The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
network.c
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or
5  * (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: 8d77c0efb30c2b28881d7665e14a33317eb63141 $
19  *
20  * @brief Receiver of socket data, which sends messages to the workers.
21  * @file io/network.c
22  *
23  * @copyright 2016 Alan DeKok (aland@freeradius.org)
24  */
25 RCSID("$Id: 8d77c0efb30c2b28881d7665e14a33317eb63141 $")
26 
27 #define LOG_PREFIX nr->name
28 
29 #define LOG_DST nr->log
30 
31 #include <freeradius-devel/util/event.h>
32 #include <freeradius-devel/util/misc.h>
33 #include <freeradius-devel/util/rand.h>
34 #include <freeradius-devel/util/rb.h>
35 #include <freeradius-devel/util/syserror.h>
36 #include <freeradius-devel/util/atexit.h>
37 #include <freeradius-devel/util/talloc.h>
38 
39 #include <freeradius-devel/io/channel.h>
40 #include <freeradius-devel/io/control.h>
41 #include <freeradius-devel/io/listen.h>
42 #include <freeradius-devel/io/network.h>
43 #include <freeradius-devel/io/queue.h>
44 #include <freeradius-devel/io/ring_buffer.h>
45 #include <freeradius-devel/io/worker.h>
46 
47 #define MAX_WORKERS 64
48 
49 static _Thread_local fr_ring_buffer_t *fr_network_rb;
50 
51 typedef struct {
54  size_t packet_len;
57 
58 /** Associate a worker thread with a network thread
59  *
60  */
61 typedef struct {
62  fr_heap_index_t heap_id; //!< workers are in a heap
63  fr_time_delta_t cpu_time; //!< how much CPU time this worker has spent
64  fr_time_delta_t predicted; //!< predicted processing time for one packet
65 
66  bool blocked; //!< is this worker blocked?
67 
68  fr_channel_t *channel; //!< channel to the worker
69  fr_worker_t *worker; //!< worker pointer
72 
73 typedef struct {
74  fr_rb_node_t listen_node; //!< rbtree node for looking up by listener.
75  fr_rb_node_t num_node; //!< rbtree node for looking up by number.
76 
77  fr_network_t *nr; //!< O(N) issues in talloc
78  int number; //!< unique ID
79  fr_heap_index_t heap_id; //!< for the sockets_by_num heap
80 
81  fr_event_filter_t filter; //!< what type of filter it is
82 
83  bool dead; //!< is it dead?
84  bool blocked; //!< is it blocked?
85 
86  unsigned int outstanding; //!< number of outstanding packets sent to the worker
87  fr_listen_t *listen; //!< I/O ctx and functions.
88 
89  fr_message_set_t *ms; //!< message buffers for this socket.
90  fr_channel_data_t *cd; //!< cached in case of allocation & read error
91  size_t leftover; //!< leftover data from a previous read
92  size_t written; //!< however much we did in a partial write
93 
94  fr_channel_data_t *pending; //!< the currently pending partial packet
95  fr_heap_t *waiting; //!< packets waiting to be written
98 
99 /*
100  * We have an array of workers, so we can index the workers in
101  * O(1) time. remove the heap of "workers ordered by CPU time"
102  * when we send a packet to a worker, just update the predicted
103  * CPU time in place. when we receive a reply from a worker,
104  * just update the predicted CPU time in place.
105  *
106  * when we need to choose a worker, pick 2 at random, and then
107  * choose the one with the lowe cpu time. For background, see
108  * "Power of Two-Choices" and
109  * https://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf
110  * https://www.eecs.harvard.edu/~michaelm/postscripts/tpds2001.pdf
111  */
112 struct fr_network_s {
113  char const *name; //!< Network ID for logging.
114 
115  pthread_t thread_id; //!< for self
116 
117  bool suspended; //!< whether or not we're suspended.
118 
119  fr_log_t const *log; //!< log destination
120  fr_log_lvl_t lvl; //!< debug log level
121 
122  fr_atomic_queue_t *aq_control; //!< atomic queue for control messages sent to me
123 
124  fr_control_t *control; //!< the control plane
125 
126  fr_ring_buffer_t *rb; //!< ring buffer for my control-plane messages
127 
128  fr_event_list_t *el; //!< our event list
129 
130  fr_heap_t *replies; //!< replies from the worker, ordered by priority / origin time
131 
133 
134  fr_rb_tree_t *sockets; //!< list of sockets we're managing, ordered by the listener
135  fr_rb_tree_t *sockets_by_num; //!< ordered by number;
136 
137  int num_workers; //!< number of active workers
138  int num_blocked; //!< number of blocked workers
139  int num_pending_workers; //!< number of workers we're waiting to start.
140  int max_workers; //!< maximum number of allowed workers
141  int num_sockets; //!< actually a counter...
142 
143  int signal_pipe[2]; //!< Pipe for signalling the worker in an orderly way.
144  ///< This is more deterministic than using async signals.
145 
146  bool exiting; //!< are we exiting?
147 
148  fr_network_config_t config; //!< configuration
150 };
151 
152 static void fr_network_post_event(fr_event_list_t *el, fr_time_t now, void *uctx);
153 static int fr_network_pre_event(fr_time_t now, fr_time_delta_t wake, void *uctx);
155 static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx);
156 
157 static int8_t reply_cmp(void const *one, void const *two)
158 {
159  fr_channel_data_t const *a = one, *b = two;
160  int ret;
161 
162  ret = CMP(a->priority, b->priority);
163  if (ret != 0) return ret;
164 
165  return fr_time_cmp(a->m.when, b->m.when);
166 }
167 
168 static int8_t waiting_cmp(void const *one, void const *two)
169 {
170  fr_channel_data_t const *a = one, *b = two;
171  int ret;
172 
173  ret = CMP(a->priority, b->priority);
174  if (ret != 0) return ret;
175 
176  return fr_time_cmp(a->reply.request_time, b->reply.request_time);
177 }
178 
179 static int8_t socket_listen_cmp(void const *one, void const *two)
180 {
181  fr_network_socket_t const *a = one, *b = two;
182 
183  return CMP(a->listen, b->listen);
184 }
185 
186 static int8_t socket_num_cmp(void const *one, void const *two)
187 {
188  fr_network_socket_t const *a = one, *b = two;
189 
190  return CMP(a->number, b->number);
191 }
192 
193 /*
194  * Explicitly cleanup the memory allocated to the ring buffer,
195  * just in case valgrind complains about it.
196  */
197 static int _fr_network_rb_free(void *arg)
198 {
199  return talloc_free(arg);
200 }
201 
202 /** Initialise thread local storage
203  *
204  * @return fr_ring_buffer_t for messages
205  */
207 {
209 
210  rb = fr_network_rb;
211  if (rb) return rb;
212 
214  if (!rb) {
215  fr_perror("Failed allocating memory for network ring buffer");
216  return NULL;
217  }
218 
220 
221  return rb;
222 }
223 
224 static inline bool is_network_thread(fr_network_t const *nr)
225 {
226  return (pthread_equal(pthread_self(), nr->thread_id) != 0);
227 }
228 
229 static int fr_network_listen_add_self(fr_network_t *nr, fr_listen_t *listen);
230 
231 /** Add a fr_listen_t to a network
232  *
233  * @param nr the network
234  * @param li the listener
235  */
237 {
239 
240  /*
241  * Skip a bunch of work if we're already in the network thread.
242  */
243  if (is_network_thread(nr) && !li->needs_full_setup) {
244  return fr_network_listen_add_self(nr, li);
245  }
246 
248  if (!rb) return -1;
249 
250  return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_LISTEN, &li, sizeof(li));
251 }
252 
253 
254 /** Delete a socket from a network. MUST be called only by the listener itself!.
255  *
256  * @param nr the network
257  * @param li the listener
258  */
260 {
262 
264 
265  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
266  if (!s) return -1;
267 
268  fr_network_socket_dead(nr, s);
269 
270  return 0;
271 }
272 
273 /** Add a "watch directory" call to a network
274  *
275  * @param nr the network
276  * @param li the listener
277  */
279 {
281 
283  if (!rb) return -1;
284 
285  return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_DIRECTORY, &li, sizeof(li));
286 }
287 
288 /** Add a worker to a network
289  *
290  * @param nr the network
291  * @param worker the worker
292  */
294 {
296 
298  if (!rb) return -1;
299 
300  (void) talloc_get_type_abort(nr, fr_network_t);
301  (void) talloc_get_type_abort(worker, fr_worker_t);
302 
303  return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_WORKER, &worker, sizeof(worker));
304 }
305 
306 /** Signal the network to read from a listener
307  *
308  * @param nr the network
309  * @param li the listener to read from
310  */
312 {
314 
315  (void) talloc_get_type_abort(nr, fr_network_t);
317 
318  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
319  if (!s) return;
320 
321  /*
322  * Go read the socket.
323  */
324  fr_network_read(nr->el, s->listen->fd, 0, s);
325 }
326 
327 
328 /** Inject a packet for a listener to write
329  *
330  * @param nr the network
331  * @param li the listener where the packet is being injected
332  * @param packet the packet to be written
333  * @param packet_len the length of the packet
334  * @param packet_ctx The packet context to write
335  * @param request_time when the packet was received.
336  */
337 void fr_network_listen_write(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len,
338  void *packet_ctx, fr_time_t request_time)
339 {
340  fr_message_t *lm;
342 
343  cd = (fr_channel_data_t) {
344  .m = (fr_message_t) {
346  .data_size = packet_len,
347  .when = request_time,
348  },
349 
350  .channel = {
351  .heap_id = FR_HEAP_INDEX_INVALID,
352  },
353 
354  .listen = li,
355  .priority = PRIORITY_NOW,
356  .reply.request_time = request_time,
357  };
358 
359  memcpy(&cd.m.data, &packet, sizeof(packet)); /* const issues */
360  memcpy(&cd.packet_ctx, &packet_ctx, sizeof(packet_ctx)); /* const issues */
361 
362  /*
363  * Localize the message and insert it into the heap of pending messages.
364  */
365  lm = fr_message_localize(nr, &cd.m, sizeof(cd));
366  if (!lm) return;
367 
368  if (fr_heap_insert(&nr->replies, lm) < 0) {
369  fr_message_done(lm);
370  }
371 }
372 
373 
374 /** Inject a packet for a listener to read
375  *
376  * @param nr the network
377  * @param li the listener where the packet is being injected
378  * @param packet the packet to be injected
379  * @param packet_len the length of the packet
380  * @param recv_time when the packet was received.
381  * @return
382  * - <0 on error
383  * - 0 on success
384  */
385 int fr_network_listen_inject(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, fr_time_t recv_time)
386 {
388  fr_network_inject_t my_inject;
389 
390  /*
391  * Can't inject to injection-less destinations.
392  */
393  if (!li->app_io->inject) return -1;
394 
395  /*
396  * Avoid a bounce through the event loop if we're being called from the network thread.
397  */
398  if (is_network_thread(nr)) {
400 
401  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
402  if (!s) return -1;
403 
404  /*
405  * Inject the packet. The master.c mod_read() routine will then take care of avoiding
406  * IO, and instead return the packet to the network side.
407  */
408  if (li->app_io->inject(li, packet, packet_len, recv_time) == 0) {
409  fr_network_read(nr->el, li->fd, 0, s);
410  }
411 
412  return 0;
413  }
414 
416  if (!rb) return -1;
417 
418  my_inject.listen = li;
419  my_inject.packet = talloc_memdup(NULL, packet, packet_len);
420  my_inject.packet_len = packet_len;
421  my_inject.recv_time = recv_time;
422 
423  return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_INJECT, &my_inject, sizeof(my_inject));
424 }
425 
427 {
428  static fr_event_update_t pause_read[] = {
430  { 0 }
431  };
434 
435  if (nr->suspended) return;
436 
437  for (s = fr_rb_iter_init_inorder(&iter, nr->sockets);
438  s != NULL;
439  s = fr_rb_iter_next_inorder(&iter)) {
441  }
442  nr->suspended = true;
443 }
444 
446 {
447  static fr_event_update_t resume_read[] = {
449  { 0 }
450  };
453 
454  if (!nr->suspended) return;
455 
456  for (s = fr_rb_iter_init_inorder(&iter, nr->sockets);
457  s != NULL;
458  s = fr_rb_iter_next_inorder(&iter)) {
460  }
461  nr->suspended = false;
462 }
463 
464 #define IALPHA (8)
465 #define RTT(_old, _new) fr_time_delta_wrap((fr_time_delta_unwrap(_new) + (fr_time_delta_unwrap(_old) * (IALPHA - 1))) / IALPHA)
466 
467 /** Callback which handles a message being received on the network side.
468  *
469  * @param[in] ctx the network
470  * @param[in] ch the channel that the message is on.
471  * @param[in] cd the message (if any) to start with
472  */
473 static void fr_network_recv_reply(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
474 {
475  fr_network_t *nr = ctx;
476  fr_network_worker_t *worker;
477 
478  cd->channel.ch = ch;
479 
480  /*
481  * Update stats for the worker.
482  */
483  worker = fr_channel_requestor_uctx_get(ch);
484  worker->stats.out++;
485  worker->cpu_time = cd->reply.cpu_time;
486  if (!fr_time_delta_ispos(worker->predicted)) {
487  worker->predicted = cd->reply.processing_time;
488  } else {
489  worker->predicted = RTT(worker->predicted, cd->reply.processing_time);
490  }
491 
492  /*
493  * Unblock the worker.
494  */
495  if (worker->blocked) {
496  worker->blocked = false;
497  nr->num_blocked--;
499  }
500 
501  /*
502  * Ensure that heap insert works.
503  */
504  cd->channel.heap_id = FR_HEAP_INDEX_INVALID;
505  if (fr_heap_insert(&nr->replies, cd) < 0) {
506  fr_message_done(&cd->m);
507  fr_assert(0 == 1);
508  }
509 }
510 
511 /** Handle a network control message callback for a channel
512  *
513  * This is called from the event loop when we get a notification
514  * from the event signalling pipe.
515  *
516  * @param[in] ctx the network
517  * @param[in] data the message
518  * @param[in] data_size size of the data
519  * @param[in] now the current time
520  */
521 static void fr_network_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
522 {
524  fr_channel_t *ch;
525  fr_network_t *nr = ctx;
526 
527  ce = fr_channel_service_message(now, &ch, data, data_size);
528  DEBUG3("Channel %s",
529  fr_table_str_by_value(channel_signals, ce, "<INVALID>"));
530  switch (ce) {
531  case FR_CHANNEL_ERROR:
532  return;
533 
534  case FR_CHANNEL_EMPTY:
535  return;
536 
537  case FR_CHANNEL_NOOP:
538  break;
539 
541  fr_assert(ch != NULL);
542  while (fr_channel_recv_reply(ch));
543  break;
544 
546  fr_assert(0 == 1);
547  break;
548 
549  case FR_CHANNEL_OPEN:
550  fr_assert(0 == 1);
551  break;
552 
553  case FR_CHANNEL_CLOSE:
554  {
555  fr_network_worker_t *w = talloc_get_type_abort(fr_channel_requestor_uctx_get(ch),
557  int i;
558 
559  /*
560  * Remove this worker from the array
561  */
562  for (i = 0; i < nr->num_workers; i++) {
563  DEBUG3("Worker acked our close request");
564  if (nr->workers[i] == w) {
565  nr->workers[i] = NULL;
566 
567  if (i == (nr->num_workers - 1)) break;
568 
569  /*
570  * Close the hole...
571  */
572  memcpy(&nr->workers[i], &nr->workers[i + 1], ((nr->num_workers - i) - 1));
573  break;
574  }
575  }
576  nr->num_workers--;
577  }
578  break;
579  }
580 }
581 
582 #define OUTSTANDING(_x) ((_x)->stats.in - (_x)->stats.out)
583 
584 /** Send a message on the "best" channel.
585  *
586  * @param nr the network
587  * @param cd the message we've received
588  */
590 {
591  fr_network_worker_t *worker;
592 
593  (void) talloc_get_type_abort(nr, fr_network_t);
594 
595 retry:
596  if (nr->num_workers == 1) {
597  worker = nr->workers[0];
598  if (worker->blocked) {
599  RATE_LIMIT_GLOBAL(ERROR, "Failed sending packet to worker - "
600  "In single-threaded mode and worker is blocked");
601  drop:
602  worker->stats.dropped++;
603  return -1;
604  }
605 
606  } else if (nr->num_blocked == 0) {
607  int64_t cmp;
608  uint32_t one, two;
609 
610  one = fr_rand() % nr->num_workers;
611  do {
612  two = fr_rand() % nr->num_workers;
613  } while (two == one);
614 
615  /*
616  * Choose a worker based on minimizing the amount
617  * of future work it's being asked to do.
618  *
619  * If both workers have the same number of
620  * outstanding requests, then choose the worker
621  * which has used the least total CPU time.
622  */
623  cmp = (OUTSTANDING(nr->workers[one]) - OUTSTANDING(nr->workers[two]));
624  if (cmp < 0) {
625  worker = nr->workers[one];
626 
627  } else if (cmp > 0) {
628  worker = nr->workers[two];
629 
630  } else if (fr_time_delta_lt(nr->workers[one]->cpu_time, nr->workers[two]->cpu_time)) {
631  worker = nr->workers[one];
632 
633  } else {
634  worker = nr->workers[two];
635  }
636  } else {
637  int i;
638  uint64_t min_outstanding = UINT64_MAX;
639  fr_network_worker_t *found = NULL;
640 
641  /*
642  * Some workers are blocked. Pick the worker
643  * with the least amount of future work to do.
644  */
645  for (i = 0; i < nr->num_workers; i++) {
646  uint64_t outstanding;
647 
648  worker = nr->workers[i];
649  if (worker->blocked) continue;
650 
651  outstanding = OUTSTANDING(worker);
652  if ((outstanding < min_outstanding) || !found) {
653  found = worker;
654  min_outstanding = outstanding;
655 
656  } else if (outstanding == min_outstanding) {
657  /*
658  * Queue lengths are the same.
659  * Choose this worker if it's
660  * less busy than the previous one we found.
661  */
662  if (fr_time_delta_lt(worker->cpu_time, found->cpu_time)) {
663  found = worker;
664  }
665  }
666  }
667 
668  if (!found) {
669  RATE_LIMIT_GLOBAL(PERROR, "Failed sending packet to worker - Couldn't find active worker, "
670  "%u/%u workers are blocked", nr->num_blocked, nr->num_workers);
671  return -1;
672  }
673 
674  worker = found;
675  }
676 
677  (void) talloc_get_type_abort(worker, fr_network_worker_t);
678 
679  /*
680  * Too many outstanding packets for this worker. Drop
681  * the request.
682  *
683  * If the worker we've picked has too many outstanding
684  * packets, then we have either only one worker, in which
685  * cae we should drop the packet. Or, we were unable to
686  * find a worker with smaller than max_outstanding
687  * packets. In which case all of the workers are likely
688  * at max_outstanding.
689  *
690  * In both cases, we should just drop the new packet.
691  */
692  fr_assert(worker->stats.in >= worker->stats.out);
693  if (nr->config.max_outstanding &&
694  (OUTSTANDING(worker) >= nr->config.max_outstanding)) {
695  RATE_LIMIT_GLOBAL(PERROR, "max_outstanding reached - dropping packet");
696  goto drop;
697  }
698 
699  /*
700  * Send the message to the channel. If we fail, drop the
701  * packet. The only reason for failure is that the
702  * worker isn't servicing it's input queue. When that
703  * happens, we have no idea what to do, and the whole
704  * thing falls over.
705  */
706  if (fr_channel_send_request(worker->channel, cd) < 0) {
707  worker->stats.dropped++;
708  worker->blocked = true;
709  nr->num_blocked++;
710 
711  RATE_LIMIT_GLOBAL(PERROR, "Failed sending packet to worker - %u/%u workers are blocked",
712  nr->num_blocked, nr->num_workers);
713 
714  if (nr->num_blocked == nr->num_workers) {
715  fr_network_suspend(nr);
716  return -1;
717  }
718  goto retry;
719  }
720 
721  worker->stats.in++;
722 
723  /*
724  * We're projecting that the worker will use more CPU
725  * time to process this request. The CPU time will be
726  * updated with a more accurate number when we receive a
727  * reply from this channel.
728  */
729  worker->cpu_time = fr_time_delta_add(worker->cpu_time, worker->predicted);
730 
731  return 0;
732 }
733 
734 
735 /** Send a packet to the worker.
736  *
737  * MUST only be called from the network thread.
738  *
739  * @param nr the network
740  * @param parent the parent listener
741  * @param li the listener that the packet was "read" from. Can be "parent"
742  * @param buffer the packet to send
743  * @param buflen size of the packet to send
744  * @param recv_time of the packet
745  * @param packet_ctx for the packet
746  * @return
747  * - <0 on error
748  * - 0 on success
749  */
751  const uint8_t *buffer, size_t buflen, fr_time_t recv_time, void *packet_ctx)
752 {
753  fr_channel_data_t *cd;
755 
756  (void) talloc_get_type_abort(nr, fr_network_t);
758 
759  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
760  if (!s) return -1;
761 
762  cd = (fr_channel_data_t *) fr_message_alloc(s->ms, NULL, buflen);
763  if (!cd) return -1;
764 
765  cd->listen = parent;
767  cd->packet_ctx = packet_ctx;
768  cd->request.recv_time = recv_time;
769  memcpy(cd->m.data, buffer, buflen);
770  cd->m.when = fr_time();
771 
772  if (fr_network_send_request(nr, cd) < 0) {
773  talloc_free(cd->packet_ctx);
774  fr_message_done(&cd->m);
775  nr->stats.dropped++;
776  s->stats.dropped++;
777  return -1;
778  }
779 
780  s->outstanding++;
781  return 0;
782 }
783 
784 /** Get the number of outstanding packets
785  *
786  * @param nr the network
787  * @param li the listener that the packet was "read" from
788  * @return
789  * - <0 on error
790  * - the number of outstanding packets
791 */
794 
795  (void) talloc_get_type_abort(nr, fr_network_t);
797 
798  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
799  if (!s) return -1;
800 
801  return s->outstanding;
802 }
803 
804 /*
805  * Mark it as dead, but DON'T free it until all of the replies
806  * have come in.
807  */
809 {
810  int i;
811 
812  if (s->dead) return;
813 
814  s->dead = true;
815 
816  fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
817 
818 
819  for (i = 0; i < nr->max_workers; i++) {
820  if (!nr->workers[i]) continue;
821 
822  (void) fr_worker_listen_cancel(nr->workers[i]->worker, s->listen);
823  }
824 
825  /*
826  * If there are no outstanding packets, then we can free
827  * it now.
828  */
829  if (!s->outstanding) {
830  talloc_free(s);
831  return;
832  }
833 
834  /*
835  * There are still outstanding packets. Leave it in the
836  * socket tree, so that replies from the worker can find
837  * it. When we've received all of the replies, then
838  * fr_network_post_event() will clean up this socket.
839  */
840 }
841 
842 /** Read a packet from the network.
843  *
844  * @param[in] el the event list.
845  * @param[in] sockfd the socket which is ready to read.
846  * @param[in] flags from kevent.
847  * @param[in] ctx the network socket context.
848  */
849 static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx)
850 {
851  int num_messages = 0;
852  fr_network_socket_t *s = ctx;
853  fr_network_t *nr = s->nr;
854  ssize_t data_size;
855  fr_channel_data_t *cd, *next;
856 
857  if (!fr_cond_assert_msg(s->listen->fd == sockfd, "Expected listen->fd (%u) to be equal event fd (%u)",
858  s->listen->fd, sockfd)) return;
859 
860  DEBUG3("Reading data from FD %u", sockfd);
861 
862  if (!s->cd) {
864  if (!cd) {
865  ERROR("Failed allocating message size %zd! - Closing socket",
867  fr_network_socket_dead(nr, s);
868  return;
869  }
870  } else {
871  cd = s->cd;
872  }
873 
874  fr_assert(cd->m.data != NULL);
875 
876 next_message:
877  /*
878  * Poll this socket, but not too often. We have to go
879  * service other sockets, too.
880  */
881  if (num_messages > 16) {
882  s->cd = cd;
883  return;
884  }
885 
887 
888  /*
889  * Read data from the network.
890  *
891  * Return of 0 means "no data", which is fine for UDP.
892  * For TCP, if an underlying read() on the TCP socket
893  * returns 0, (which signals that the FD is no longer
894  * usable) this function should return -1, so that the
895  * network side knows that it needs to close the
896  * connection.
897  */
898  data_size = s->listen->app_io->read(s->listen, &cd->packet_ctx, &cd->request.recv_time,
899  cd->m.data, cd->m.rb_size, &s->leftover);
900  if (data_size == 0) {
901  /*
902  * Cache the message for later. This is
903  * important for stream sockets, which can do
904  * partial reads into the current buffer. We
905  * need to be able to give the same buffer back
906  * to the stream socket for subsequent reads.
907  *
908  * Since we have a message set for each
909  * fr_io_socket_t, no "head of line"
910  * blocking issues can happen for stream sockets.
911  */
912  s->cd = cd;
913  return;
914  }
915 
916  /*
917  * Error: close the connection, and remove the fr_listen_t
918  */
919  if (data_size < 0) {
920 // fr_log(nr->log, L_DBG_ERR, "error from transport read on socket %d", sockfd);
921  fr_network_socket_dead(nr, s);
922  return;
923  }
924  s->cd = NULL;
925 
926  DEBUG3("Read %zd byte(s) from FD %u", data_size, sockfd);
927  nr->stats.in++;
928  s->stats.in++;
929 
930  /*
931  * Initialize the rest of the fields of the channel data.
932  *
933  * We always use "now" as the time of the message, as the
934  * packet MAY be a duplicate packet magically resurrected
935  * from the past. i.e. If the read routines are doing
936  * dedup, then they notice that the packet is a
937  * duplicate. In that case, they send over a copy of the
938  * packet, BUT with the original timestamp. This
939  * information tells the worker that the packet is a
940  * duplicate.
941  */
942  cd->m.when = fr_time();
943  cd->listen = s->listen;
944 
945  /*
946  * Nothing in the buffer yet. Allocate room for one
947  * packet.
948  */
949  if ((cd->m.data_size == 0) && (!s->leftover)) {
950 
951  (void) fr_message_alloc(s->ms, &cd->m, data_size);
952  next = NULL;
953 
954  } else {
955  /*
956  * There are leftover bytes in the buffer, feed
957  * them to the next round of reading.
958  */
959  next = (fr_channel_data_t *) fr_message_alloc_reserve(s->ms, &cd->m, data_size, s->leftover,
961  if (!next) {
962  PERROR("Failed reserving partial packet.");
963  // @todo - probably close the socket...
964  fr_assert(0 == 1);
965  }
966  }
967 
968  /*
969  * Set the priority. Which incidentally also checks if
970  * we're allowed to read this particular kind of packet.
971  *
972  * That check is because the app_io handlers just read
973  * packets, and don't really have access to the parent
974  * "list of allowed packet types". So we have to do the
975  * work here in a callback.
976  *
977  * That should probably be fixed...
978  */
979  if (s->listen->app->priority) {
980  int priority;
981 
982  priority = s->listen->app->priority(s->listen->app_instance, cd->m.data, data_size);
983  if (priority <= 0) goto discard;
984 
985  cd->priority = priority;
986  }
987 
988  if (fr_network_send_request(nr, cd) < 0) {
989  discard:
990  talloc_free(cd->packet_ctx); /* not sure what else to do here */
991  fr_message_done(&cd->m);
992  nr->stats.dropped++;
993  s->stats.dropped++;
994 
995  } else {
996  /*
997  * One more packet sent to a worker.
998  */
999  s->outstanding++;
1000  }
1001 
1002  /*
1003  * If there is a next message, go read it from the buffer.
1004  *
1005  * @todo - note that this calls read(), even if the
1006  * app_io has paused the reader. We likely want to be
1007  * able to check that, too. We might just remove this
1008  * "goto"...
1009  */
1010  if (next) {
1011  cd = next;
1012  num_messages++;
1013  goto next_message;
1014  }
1015 }
1016 
1017 int fr_network_sendto_worker(fr_network_t *nr, fr_listen_t *li, void *packet_ctx, uint8_t const *data, size_t data_len, fr_time_t recv_time)
1018 {
1019  fr_channel_data_t *cd;
1021 
1022  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
1023  if (!s) return -1;
1024 
1025  cd = (fr_channel_data_t *) fr_message_alloc(s->ms, NULL, data_len);
1026  if (!cd) return -1;
1027 
1028  s->stats.in++;
1029 
1030  cd->priority = PRIORITY_NORMAL;
1031 
1032  cd->m.when = recv_time;
1033  cd->listen = li;
1034  cd->packet_ctx = packet_ctx;
1035 
1036  memcpy(cd->m.data, data, data_len);
1037 
1038  if (fr_network_send_request(nr, cd) < 0) {
1039  talloc_free(packet_ctx);
1040  fr_message_done(&cd->m);
1041  nr->stats.dropped++;
1042  s->stats.dropped++;
1043 
1044  } else {
1045  /*
1046  * One more packet sent to a worker.
1047  */
1048  s->outstanding++;
1049  }
1050 
1051  return 0;
1052 }
1053 
1054 
1055 /** Get a notification that a vnode changed
1056  *
1057  * @param[in] el the event list.
1058  * @param[in] sockfd the socket which is ready to read.
1059  * @param[in] fflags from kevent.
1060  * @param[in] ctx the network socket context.
1061  */
1062 static void fr_network_vnode_extend(UNUSED fr_event_list_t *el, int sockfd, int fflags, void *ctx)
1063 {
1064  fr_network_socket_t *s = ctx;
1065  fr_network_t *nr = s->nr;
1066 
1067  fr_cond_assert(s->listen->fd == sockfd);
1068 
1069  DEBUG3("network vnode");
1070 
1071  /*
1072  * Tell the IO handler that something has happened to the
1073  * file.
1074  */
1075  s->listen->app_io->vnode(s->listen, fflags);
1076 }
1077 
1078 
1079 /** Handle errors for a socket.
1080  *
1081  * @param[in] el the event list
1082  * @param[in] sockfd the socket which has a fatal error.
1083  * @param[in] flags returned by kevent.
1084  * @param[in] fd_errno returned by kevent.
1085  * @param[in] ctx the network socket context.
1086  */
1088  int fd_errno, void *ctx)
1089 {
1090  fr_network_socket_t *s = ctx;
1091  fr_network_t *nr = s->nr;
1092 
1093  if (s->listen->app_io->error) {
1094  s->listen->app_io->error(s->listen);
1095 
1096  } else if (flags & EV_EOF) {
1097  DEBUG2("Socket %s closed by peer", s->listen->name);
1098 
1099  } else {
1100  ERROR("Socket %s errored - %s", s->listen->name, fr_syserror(fd_errno));
1101  }
1102 
1103  fr_network_socket_dead(s->nr, s);
1104 }
1105 
1106 
1107 static fr_event_update_t const pause_write[] = {
1109  { 0 }
1110 };
1111 
1114  { 0 }
1115 };
1116 
1117 
1118 /** Write packets to the network.
1119  *
1120  * @param el the event list
1121  * @param sockfd the socket which is ready to write
1122  * @param flags returned by kevent.
1123  * @param ctx the network socket context.
1124  */
1125 static void fr_network_write(UNUSED fr_event_list_t *el, UNUSED int sockfd, UNUSED int flags, void *ctx)
1126 {
1127  fr_network_socket_t *s = ctx;
1128  fr_listen_t *li = s->listen;
1129  fr_network_t *nr = s->nr;
1130  fr_channel_data_t *cd;
1131 
1132  (void) talloc_get_type_abort(nr, fr_network_t);
1133 
1134  /*
1135  * Start with the currently pending message, and then
1136  * work through the priority heap.
1137  */
1138  if (s->pending) {
1139  cd = s->pending;
1140  s->pending = NULL;
1141 
1142  } else {
1143  cd = fr_heap_pop(&s->waiting);
1144  }
1145 
1146  while (cd != NULL) {
1147  int rcode;
1148 
1149  fr_assert(li == cd->listen);
1150  rcode = li->app_io->write(li, cd->packet_ctx,
1151  cd->reply.request_time,
1152  cd->m.data, cd->m.data_size, s->written);
1153 
1154  /*
1155  * As a special case, allow write() to return
1156  * "0", which means "close the socket".
1157  */
1158  if (rcode == 0) goto dead;
1159 
1160  /*
1161  * Or we have a write error.
1162  */
1163  if (rcode < 0) {
1164  /*
1165  * Stop processing the heap, and set the
1166  * pending message to the current one.
1167  */
1168  if (errno == EWOULDBLOCK) {
1169  save_pending:
1170  fr_assert(!s->pending);
1171 
1172  if (cd->m.status != FR_MESSAGE_LOCALIZED) {
1173  fr_message_t *lm;
1174 
1175  lm = fr_message_localize(s, &cd->m, sizeof(*cd));
1176  if (!lm) {
1177  ERROR("Failed saving pending packet");
1178  goto dead;
1179  }
1180 
1181  cd = (fr_channel_data_t *) lm;
1182  }
1183 
1184  if (!s->blocked) {
1186  PERROR("Failed adding write callback to event loop");
1187  goto dead;
1188  }
1189 
1190  s->blocked = true;
1191  }
1192 
1193  s->pending = cd;
1194  return;
1195  }
1196 
1197  /*
1198  * As a special hack, check for something
1199  * that will never be returned from a
1200  * real write() routine. Which then
1201  * signals to us that we have to close
1202  * the socket, but NOT complain about it.
1203  */
1204  if (errno == ECONNREFUSED) goto dead;
1205 
1206  PERROR("Failed writing to socket %s", s->listen->name);
1207  if (li->app_io->error) li->app_io->error(li);
1208 
1209  dead:
1210  fr_message_done(&cd->m);
1211  fr_network_socket_dead(nr, s);
1212  return;
1213  }
1214 
1215  /*
1216  * If we've done a partial write, localize the message and continue.
1217  */
1218  if ((size_t) rcode < cd->m.data_size) {
1219  s->written = rcode;
1220  goto save_pending;
1221  }
1222 
1223  s->written = 0;
1224 
1225  /*
1226  * Reset for the next message.
1227  */
1228  fr_message_done(&cd->m);
1229  nr->stats.out++;
1230  s->stats.out++;
1231 
1232  /*
1233  * Grab the net entry.
1234  */
1235  cd = fr_heap_pop(&s->waiting);
1236  }
1237 
1238  /*
1239  * We've successfully written all of the packets. Remove
1240  * the write callback.
1241  */
1243  PERROR("Failed removing write callback from event loop");
1244  fr_network_socket_dead(nr, s);
1245  }
1246 
1247  s->blocked = false;
1248 }
1249 
1251 {
1252  fr_network_t *nr = s->nr;
1253  fr_channel_data_t *cd;
1254 
1255  fr_assert(s->outstanding == 0);
1256 
1257  fr_rb_delete(nr->sockets, s);
1258  fr_rb_delete(nr->sockets_by_num, s);
1259 
1260  fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
1261 
1262  if (s->listen->app_io->close) {
1263  s->listen->app_io->close(s->listen);
1264  } else {
1265  close(s->listen->fd);
1266  }
1267 
1268  if (s->pending) {
1269  fr_message_done(&s->pending->m);
1270  s->pending = NULL;
1271  }
1272 
1273  /*
1274  * Clean up any queued entries.
1275  */
1276  while ((cd = fr_heap_pop(&s->waiting)) != NULL) {
1277  fr_message_done(&cd->m);
1278  }
1279 
1280  talloc_free(s->waiting);
1281 
1282  return 0;
1283 }
1284 
1285 
1286 /** Handle a network control message callback for a new listener
1287  *
1288  * @param[in] ctx the network
1289  * @param[in] data the message
1290  * @param[in] data_size size of the data
1291  * @param[in] now the current time
1292  */
1293 static void fr_network_listen_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1294 {
1295  fr_network_t *nr = ctx;
1296  fr_listen_t *listen;
1297 
1298  fr_assert(data_size == sizeof(listen));
1299 
1300  if (data_size != sizeof(listen)) return;
1301 
1302  memcpy(&listen, data, sizeof(listen));
1303 
1304  (void) fr_network_listen_add_self(nr, listen);
1305 }
1306 
1308 {
1310  fr_app_io_t const *app_io;
1311  size_t size;
1312  int num_messages;
1313 
1314  fr_assert(listen->app_io != NULL);
1315 
1316  /*
1317  * Non-socket listeners just get told about the event
1318  * list, and nothing else.
1319  */
1320  if (listen->non_socket_listener) {
1321  fr_assert(listen->app_io->event_list_set != NULL);
1322  fr_assert(!listen->app_io->read);
1323  fr_assert(!listen->app_io->write);
1324 
1325  listen->app_io->event_list_set(listen, nr->el, nr);
1326 
1327  /*
1328  * We use fr_log() here to avoid the "Network - " prefix.
1329  */
1330  fr_log(nr->log, L_DBG, __FILE__, __LINE__, "Listener %s bound to virtual server %s",
1331  listen->name, cf_section_name2(listen->server_cs));
1332 
1333  return 0;
1334  }
1335 
1336  s = talloc_zero(nr, fr_network_socket_t);
1337  fr_assert(s != NULL);
1338 
1339  s->nr = nr;
1340  s->listen = listen;
1341  s->number = nr->num_sockets++;
1342 
1343  MEM(s->waiting = fr_heap_alloc(s, waiting_cmp, fr_channel_data_t, channel.heap_id, 0));
1344 
1345  talloc_set_destructor(s, _network_socket_free);
1346 
1347  /*
1348  * Put reasonable limits on the ring buffer size. Then
1349  * round it up to the nearest power of 2, which is
1350  * required by the ring buffer code.
1351  */
1352  num_messages = s->listen->num_messages;
1353  if (num_messages < 8) num_messages = 8;
1354 
1355  size = s->listen->default_message_size * num_messages;
1356  if (size < (1 << 17)) size = (1 << 17);
1357  if (size > (100 * 1024 * 1024)) size = (100 * 1024 * 1024);
1358 
1359  /*
1360  * Allocate the ring buffer for messages and packets.
1361  */
1362  s->ms = fr_message_set_create(s, num_messages,
1363  sizeof(fr_channel_data_t),
1364  size);
1365  if (!s->ms) {
1366  PERROR("Failed creating message buffers for network IO");
1367  talloc_free(s);
1368  return -1;
1369  }
1370 
1371  app_io = s->listen->app_io;
1373 
1374  if (fr_event_fd_insert(nr, NULL, nr->el, s->listen->fd,
1378  s) < 0) {
1379  PERROR("Failed adding new socket to network event loop");
1380  talloc_free(s);
1381  return -1;
1382  }
1383 
1384  /*
1385  * Start of with write updates being paused. We don't
1386  * care about being able to write if there's nothing to
1387  * write.
1388  */
1390 
1391  /*
1392  * Add the listener before calling the app_io, so that
1393  * the app_io can find the listener which we're adding
1394  * here.
1395  */
1396  (void) fr_rb_insert(nr->sockets, s);
1397  (void) fr_rb_insert(nr->sockets_by_num, s);
1398 
1399  if (app_io->event_list_set) app_io->event_list_set(s->listen, nr->el, nr);
1400 
1401  /*
1402  * We use fr_log() here to avoid the "Network - " prefix.
1403  */
1404  fr_log(nr->log, L_DBG, __FILE__, __LINE__, "Listening on %s bound to virtual server %s",
1406 
1407  DEBUG3("Using new socket %s with FD %d", s->listen->name, s->listen->fd);
1408 
1409  return 0;
1410 }
1411 
1412 /** Handle a network control message callback for a new "watch directory"
1413  *
1414  * @param[in] ctx the network
1415  * @param[in] data the message
1416  * @param[in] data_size size of the data
1417  * @param[in] now the current time
1418  */
1419 static void fr_network_directory_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1420 {
1421  int num_messages;
1422  fr_network_t *nr = ctx;
1424  fr_app_io_t const *app_io;
1426 
1427  fr_assert(data_size == sizeof(s->listen));
1428 
1429  if (data_size != sizeof(s->listen)) return;
1430 
1431  s = talloc_zero(nr, fr_network_socket_t);
1432  fr_assert(s != NULL);
1433 
1434  s->nr = nr;
1435  memcpy(&s->listen, data, sizeof(s->listen));
1436  s->number = nr->num_sockets++;
1437 
1438  MEM(s->waiting = fr_heap_alloc(s, waiting_cmp, fr_channel_data_t, channel.heap_id, 0));
1439 
1440  talloc_set_destructor(s, _network_socket_free);
1441 
1442  /*
1443  * Allocate the ring buffer for messages and packets.
1444  */
1445  num_messages = s->listen->num_messages;
1446  if (num_messages < 8) num_messages = 8;
1447 
1448  s->ms = fr_message_set_create(s, num_messages,
1449  sizeof(fr_channel_data_t),
1451  if (!s->ms) {
1452  PERROR("Failed creating message buffers for directory IO");
1453  talloc_free(s);
1454  return;
1455  }
1456 
1457  app_io = s->listen->app_io;
1458 
1459  if (app_io->event_list_set) app_io->event_list_set(s->listen, nr->el, nr);
1460 
1462 
1463  if (fr_event_filter_insert(nr, NULL, nr->el, s->listen->fd, s->filter,
1464  &funcs,
1465  app_io->error ? fr_network_error : NULL,
1466  s) < 0) {
1467  PERROR("Failed adding directory monitor event loop");
1468  talloc_free(s);
1469  return;
1470  }
1471 
1472  (void) fr_rb_insert(nr->sockets, s);
1473  (void) fr_rb_insert(nr->sockets_by_num, s);
1474 
1475  DEBUG3("Using new socket with FD %d", s->listen->fd);
1476 }
1477 
1478 /** Handle a network control message callback for a new worker
1479  *
1480  * @param[in] ctx the network
1481  * @param[in] data the message
1482  * @param[in] data_size size of the data
1483  * @param[in] now the current time
1484  */
1485 static void fr_network_worker_started_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1486 {
1487  int i;
1488  fr_network_t *nr = ctx;
1489  fr_worker_t *worker;
1491 
1492  fr_assert(data_size == sizeof(worker));
1493 
1494  memcpy(&worker, data, data_size);
1495  (void) talloc_get_type_abort(worker, fr_worker_t);
1496 
1497  MEM(w = talloc_zero(nr, fr_network_worker_t));
1498 
1499  w->worker = worker;
1500  w->channel = fr_worker_channel_create(worker, w, nr->control);
1502  fr_fatal_assert_msg(w->channel, "Failed creating new channel");
1503 
1506 
1507  /*
1508  * FIXME: This creates a race in the network loop
1509  * exit condition, because it can theoretically
1510  * be signalled to exit before the workers have
1511  * ACKd channel creation.
1512  */
1513  nr->num_workers++;
1514 
1515  /*
1516  * Insert the worker into the array of workers.
1517  */
1518  for (i = 0; i < nr->max_workers; i++) {
1519  if (nr->workers[i]) continue;
1520 
1521  nr->workers[i] = w;
1522  return;
1523  }
1524 
1525  /*
1526  * Run out of room to put workers!
1527  */
1528  fr_assert(0 == 1);
1529 }
1530 
1531 /** Handle a network control message callback for a packet sent to a socket
1532  *
1533  * @param[in] ctx the network
1534  * @param[in] data the message
1535  * @param[in] data_size size of the data
1536  * @param[in] now the current time
1537  */
1538 static void fr_network_inject_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1539 {
1540  fr_network_t *nr = ctx;
1541  fr_network_inject_t my_inject;
1543 
1544  fr_assert(data_size == sizeof(my_inject));
1545 
1546  memcpy(&my_inject, data, data_size);
1547  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = my_inject.listen });
1548  if (!s) {
1549  talloc_free(my_inject.packet); /* MUST be it's own TALLOC_CTX */
1550  return;
1551  }
1552 
1553  /*
1554  * Inject the packet, and then read it back from the
1555  * network.
1556  */
1557  if (s->listen->app_io->inject(s->listen, my_inject.packet, my_inject.packet_len, my_inject.recv_time) == 0) {
1558  fr_network_read(nr->el, s->listen->fd, 0, s);
1559  }
1560 
1561  talloc_free(my_inject.packet);
1562 }
1563 
1564 /** Run the event loop 'pre' callback
1565  *
1566  * This function MUST DO NO WORK. All it does is check if there's
1567  * work, and tell the event code to return to the main loop if
1568  * there's work to do.
1569  *
1570  * @param[in] now the current time.
1571  * @param[in] wake the time when the event loop will wake up.
1572  * @param[in] uctx the network
1573  */
1575 {
1576  fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1577 
1578  if (fr_heap_num_elements(nr->replies) > 0) return 1;
1579 
1580  return 0;
1581 }
1582 
1583 /** Handle replies after all FD and timer events have been serviced
1584  *
1585  * @param el the event loop
1586  * @param now the current time (mostly)
1587  * @param uctx the fr_network_t
1588  */
1590 {
1591  fr_channel_data_t *cd;
1592  fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1593 
1594  /*
1595  * Pull the replies off of our global heap, and try to
1596  * push them to the individual sockets.
1597  */
1598  while ((cd = fr_heap_pop(&nr->replies)) != NULL) {
1599  fr_listen_t *li;
1601 
1602  li = cd->listen;
1603 
1604  /*
1605  * @todo - cache this somewhere so we don't need
1606  * to do an rbtree lookup for every packet.
1607  */
1608  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
1609 
1610  /*
1611  * This shouldn't happen, but be safe...
1612  */
1613  if (!s) {
1614  fr_message_done(&cd->m);
1615  continue;
1616  }
1617 
1618  if (cd->m.status != FR_MESSAGE_LOCALIZED) {
1619  fr_assert(s->outstanding > 0);
1620  s->outstanding--;
1621  }
1622 
1623  /*
1624  * Just mark the message done, and skip it.
1625  */
1626  if (s->dead) {
1627  fr_message_done(&cd->m);
1628 
1629  /*
1630  * No more packets, it's safe to delete
1631  * the socket.
1632  */
1633  if (!s->outstanding) talloc_free(s);
1634 
1635  continue;
1636  }
1637 
1638  /*
1639  * No data to write to the socket, so we skip the message.
1640  */
1641  if (!cd->m.data_size) {
1642  fr_message_done(&cd->m);
1643  continue;
1644  }
1645 
1646  /*
1647  * No pending message, let's try writing it.
1648  *
1649  * If there is a pending message, then we're
1650  * waiting for IO write to become ready.
1651  */
1652  if (!s->pending) {
1653  fr_assert(!s->blocked);
1654  (void) fr_heap_insert(&s->waiting, cd);
1655  fr_network_write(nr->el, s->listen->fd, 0, s);
1656  }
1657  }
1658 }
1659 
1660 /** Stop a network thread in an orderly way
1661  *
1662  * @param[in] nr the network to stop
1663  */
1665 {
1666  fr_channel_data_t *cd;
1667 
1668  (void) talloc_get_type_abort(nr, fr_network_t);
1669 
1670  /*
1671  * Close the network sockets
1672  */
1673  {
1674  fr_network_socket_t **sockets;
1675  size_t len;
1676  size_t i;
1677 
1678  if (fr_rb_flatten_inorder(nr, (void ***)&sockets, nr->sockets) < 0) return -1;
1679  len = talloc_array_length(sockets);
1680 
1681  for (i = 0; i < len; i++) {
1682  /*
1683  * Force to zero so we don't trigger asserts
1684  * if packets are being processed and the
1685  * server exits.
1686  */
1687  sockets[i]->outstanding = 0;
1688  talloc_free(sockets[i]);
1689  }
1690 
1691  talloc_free(sockets);
1692  }
1693 
1694 
1695  /*
1696  * Clean up all outstanding replies.
1697  *
1698  * We can't do this after signalling the
1699  * workers to close, because they free
1700  * their message sets, and we end up
1701  * getting random use-after-free errors
1702  * as there's a race between the network
1703  * popping replies, and the workers
1704  * freeing their message sets.
1705  *
1706  * This isn't perfect, and we might still
1707  * lose some replies, but it's good enough
1708  * for now.
1709  *
1710  * @todo - call transport "done" for the reply, so that
1711  * it knows the replies are done, too.
1712  */
1713  while ((cd = fr_heap_pop(&nr->replies)) != NULL) {
1714  fr_message_done(&cd->m);
1715  }
1716 
1717  /*
1718  * Signal the workers that we're closing
1719  *
1720  * nr->num_workers is decremented every
1721  * time a worker closes a socket.
1722  *
1723  * When nr->num_workers == 0, the event
1724  * loop (fr_network()) will exit.
1725  */
1726  {
1727  int i;
1728 
1729  for (i = 0; i < nr->num_workers; i++) {
1730  fr_network_worker_t *worker = nr->workers[i];
1731 
1733  }
1734  }
1735 
1736  (void) fr_event_pre_delete(nr->el, fr_network_pre_event, nr);
1738  nr->exiting = true;
1740 
1741  return 0;
1742 }
1743 
1744 /** Read handler for signal pipe
1745  *
1746  */
1747 static void _signal_pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
1748 {
1749  fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1750  uint8_t buff;
1751 
1752  if (read(fd, &buff, sizeof(buff)) < 0) {
1753  ERROR("Failed reading signal - %s", fr_syserror(errno));
1754  return;
1755  }
1756 
1757  fr_assert(buff == 1);
1758 
1759  /*
1760  * fr_network_stop() will signal the workers
1761  * to exit (by closing their channels).
1762  *
1763  * When we get the ack, we decrement our
1764  * nr->num_workers counter.
1765  *
1766  * When the counter reaches 0, the event loop
1767  * exits.
1768  */
1769  DEBUG2("Signalled to exit");
1770  fr_network_destroy(nr);
1771 }
1772 
1773 /** The main network worker function.
1774  *
1775  * @param[in] nr the network data structure to run.
1776  */
1778 {
1779  /*
1780  * Run until we're told to exit AND the number of
1781  * workers has dropped to zero.
1782  *
1783  * This is important as if we exit too early we
1784  * free the channels out from underneath the
1785  * workers and they read uninitialised memory.
1786  *
1787  * Whenever a worker ACKs our close notification
1788  * nr->num_workers is decremented, so when
1789  * nr->num_workers == 0, all workers have ACKd
1790  * our close and are no longer using the channel.
1791  */
1792  while (likely(!(nr->exiting && (nr->num_workers == 0)))) {
1793  bool wait_for_event;
1794  int num_events;
1795 
1796  /*
1797  * There are runnable requests. We still service
1798  * the event loop, but we don't wait for events.
1799  */
1800  wait_for_event = (fr_heap_num_elements(nr->replies) == 0);
1801 
1802  /*
1803  * Check the event list. If there's an error
1804  * (e.g. exit), we stop looping and clean up.
1805  */
1806  DEBUG3("Gathering events - %s", wait_for_event ? "will wait" : "Will not wait");
1807  num_events = fr_event_corral(nr->el, fr_time(), wait_for_event);
1808  DEBUG3("%u event(s) pending%s",
1809  num_events == -1 ? 0 : num_events, num_events == -1 ? " - event loop exiting" : "");
1810  if (num_events < 0) break;
1811 
1812  /*
1813  * Service outstanding events.
1814  */
1815  if (num_events > 0) {
1816  DEBUG4("Servicing event(s)");
1817  fr_event_service(nr->el);
1818  }
1819  }
1820  return;
1821 }
1822 
1823 /** Signal a network thread to exit
1824  *
1825  * @note Request to exit will be processed asynchronously.
1826  *
1827  * @param[in] nr the network data structure to manage
1828  * @return
1829  * - 0 on success.
1830  * - -1 on failure.
1831  */
1833 {
1834  if (write(nr->signal_pipe[1], &(uint8_t){ 0x01 }, 1) < 0) {
1835  fr_strerror_printf("Failed signalling network thread to exit - %s", fr_syserror(errno));
1836  return -1;
1837  }
1838 
1839  return 0;
1840 }
1841 
1842 /** Free any resources associated with a network thread
1843  *
1844  */
1846 {
1847  if (nr->signal_pipe[0] >= 0) close(nr->signal_pipe[0]);
1848  if (nr->signal_pipe[1] >= 0) close(nr->signal_pipe[1]);
1849 
1850  return 0;
1851 }
1852 
1853 /** Create a network
1854  *
1855  * @param[in] ctx The talloc ctx
1856  * @param[in] el The event list
1857  * @param[in] name Networker identifier.
1858  * @param[in] logger The destination for all logging messages
1859  * @param[in] lvl Log level
1860  * @param[in] config configuration structure.
1861  * @return
1862  * - NULL on error
1863  * - fr_network_t on success
1864  */
1865 fr_network_t *fr_network_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name,
1866  fr_log_t const *logger, fr_log_lvl_t lvl,
1867  fr_network_config_t const *config)
1868 {
1869  fr_network_t *nr;
1870 
1871  nr = talloc_zero(ctx, fr_network_t);
1872  if (!nr) {
1873  fr_strerror_const("Failed allocating memory");
1874  return NULL;
1875  }
1876  talloc_set_destructor(nr, _fr_network_free);
1877 
1878  nr->name = talloc_strdup(nr, name);
1879 
1880  nr->thread_id = pthread_self();
1881  nr->el = el;
1882  nr->log = logger;
1883  nr->lvl = lvl;
1884 
1885  nr->max_workers = MAX_WORKERS;
1886  nr->num_workers = 0;
1887  nr->signal_pipe[0] = -1;
1888  nr->signal_pipe[1] = -1;
1889  if (config) nr->config = *config;
1890 
1891  nr->aq_control = fr_atomic_queue_alloc(nr, 1024);
1892  if (!nr->aq_control) {
1893  talloc_free(nr);
1894  return NULL;
1895  }
1896 
1897  nr->control = fr_control_create(nr, el, nr->aq_control);
1898  if (!nr->control) {
1899  fr_strerror_const_push("Failed creating control queue");
1900  fail:
1901  talloc_free(nr);
1902  return NULL;
1903  }
1904 
1905  /*
1906  * @todo - rely on thread-local variables. And then the
1907  * various users of this can check if (rb == nr->rb), and
1908  * if so, skip the whole control plane / kevent /
1909  * whatever roundabout thing.
1910  */
1912  if (!nr->rb) {
1913  fr_strerror_const_push("Failed creating ring buffer");
1914  fail2:
1915  talloc_free(nr->control);
1916  goto fail;
1917  }
1918 
1920  fr_strerror_const_push("Failed adding channel callback");
1921  goto fail2;
1922  }
1923 
1925  fr_strerror_const_push("Failed adding socket callback");
1926  goto fail2;
1927  }
1928 
1930  fr_strerror_const_push("Failed adding socket callback");
1931  goto fail2;
1932  }
1933 
1935  fr_strerror_const_push("Failed adding worker callback");
1936  goto fail2;
1937  }
1938 
1940  fr_strerror_const_push("Failed adding packet injection callback");
1941  goto fail2;
1942  }
1943 
1944  /*
1945  * Create the various heaps.
1946  */
1948  if (!nr->sockets) {
1949  fr_strerror_const_push("Failed creating listen tree for sockets");
1950  goto fail2;
1951  }
1952 
1954  if (!nr->sockets_by_num) {
1955  fr_strerror_const_push("Failed creating number tree for sockets");
1956  goto fail2;
1957  }
1958 
1959  nr->replies = fr_heap_alloc(nr, reply_cmp, fr_channel_data_t, channel.heap_id, 0);
1960  if (!nr->replies) {
1961  fr_strerror_const_push("Failed creating heap for replies");
1962  goto fail2;
1963  }
1964 
1965  if (fr_event_pre_insert(nr->el, fr_network_pre_event, nr) < 0) {
1966  fr_strerror_const("Failed adding pre-check to event list");
1967  goto fail2;
1968  }
1969 
1970  if (fr_event_post_insert(nr->el, fr_network_post_event, nr) < 0) {
1971  fr_strerror_const("Failed inserting post-processing event");
1972  goto fail2;
1973  }
1974 
1975  if (pipe(nr->signal_pipe) < 0) {
1976  fr_strerror_printf("Failed initialising signal pipe - %s", fr_syserror(errno));
1977  goto fail2;
1978  }
1979  if (fr_nonblock(nr->signal_pipe[0]) < 0) goto fail2;
1980  if (fr_nonblock(nr->signal_pipe[1]) < 0) goto fail2;
1981 
1982  if (fr_event_fd_insert(nr, NULL, nr->el, nr->signal_pipe[0], _signal_pipe_read, NULL, NULL, nr) < 0) {
1983  fr_strerror_const("Failed inserting event for signal pipe");
1984  goto fail2;
1985  }
1986 
1987  return nr;
1988 }
1989 
1990 int fr_network_stats(fr_network_t const *nr, int num, uint64_t *stats)
1991 {
1992  if (num < 0) return -1;
1993  if (num == 0) return 0;
1994 
1995  stats[0] = nr->stats.in;
1996  if (num >= 2) stats[1] = nr->stats.out;
1997  if (num >= 3) stats[2] = nr->stats.dup;
1998  if (num >= 4) stats[3] = nr->stats.dropped;
1999  if (num >= 5) stats[4] = nr->num_workers;
2000 
2001  if (num <= 5) return num;
2002 
2003  return 5;
2004 }
2005 
2006 void fr_network_stats_log(fr_network_t const *nr, fr_log_t const *log)
2007 {
2008  int i;
2009 
2010  /*
2011  * Dump all of the channel statistics.
2012  */
2013  for (i = 0; i < nr->max_workers; i++) {
2014  if (!nr->workers[i]) continue;
2015 
2016  fr_channel_stats_log(nr->workers[i]->channel, log, __FILE__, __LINE__);
2017  }
2018 }
2019 
2020 static int cmd_stats_self(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
2021 {
2022  fr_network_t const *nr = ctx;
2023 
2024  fprintf(fp, "count.in\t%" PRIu64 "\n", nr->stats.in);
2025  fprintf(fp, "count.out\t%" PRIu64 "\n", nr->stats.out);
2026  fprintf(fp, "count.dup\t%" PRIu64 "\n", nr->stats.dup);
2027  fprintf(fp, "count.dropped\t%" PRIu64 "\n", nr->stats.dropped);
2028  fprintf(fp, "count.sockets\t%u\n", fr_rb_num_elements(nr->sockets));
2029 
2030  return 0;
2031 }
2032 
2033 static int cmd_socket_list(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
2034 {
2035  fr_network_t const *nr = ctx;
2036  fr_rb_iter_inorder_t iter;
2038 
2039  // @todo - note that this isn't thread-safe!
2040 
2041  for (s = fr_rb_iter_init_inorder(&iter, nr->sockets);
2042  s != NULL;
2043  s = fr_rb_iter_next_inorder(&iter)) {
2044  if (!s->listen->app_io->get_name) {
2045  fprintf(fp, "%s\n", s->listen->app_io->common.name);
2046  } else {
2047  fprintf(fp, "%d\t%s\n", s->number, s->listen->app_io->get_name(s->listen));
2048  }
2049  }
2050  return 0;
2051 }
2052 
2053 static int cmd_stats_socket(FILE *fp, FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
2054 {
2055  fr_network_t const *nr = ctx;
2057 
2058  s = fr_rb_find(nr->sockets_by_num, &(fr_network_socket_t){ .number = info->box[0]->vb_uint32 });
2059  if (!s) {
2060  fprintf(fp_err, "No such socket number '%s'.\n", info->argv[0]);
2061  return -1;
2062  }
2063 
2064  fprintf(fp, "count.in\t%" PRIu64 "\n", s->stats.in);
2065  fprintf(fp, "count.out\t%" PRIu64 "\n", s->stats.out);
2066  fprintf(fp, "count.dup\t%" PRIu64 "\n", s->stats.dup);
2067  fprintf(fp, "count.dropped\t%" PRIu64 "\n", s->stats.dropped);
2068 
2069  return 0;
2070 }
2071 
2072 
2074  {
2075  .parent = "stats",
2076  .name = "network",
2077  .help = "Statistics for network threads.",
2078  .read_only = true
2079  },
2080 
2081  {
2082  .parent = "stats network",
2083  .add_name = true,
2084  .name = "self",
2085  .func = cmd_stats_self,
2086  .help = "Show statistics for a specific network thread.",
2087  .read_only = true
2088  },
2089 
2090  {
2091  .parent = "stats network",
2092  .add_name = true,
2093  .name = "socket",
2094  .syntax = "INTEGER",
2095  .func = cmd_stats_socket,
2096  .help = "Show statistics for a specific socket",
2097  .read_only = true
2098  },
2099 
2100  {
2101  .parent = "show",
2102  .name = "network",
2103  .help = "Show information about network threads.",
2104  .read_only = true
2105  },
2106 
2107  {
2108  .parent = "show network",
2109  .add_name = true,
2110  .name = "socket",
2111  .syntax = "list",
2112  .func = cmd_socket_list,
2113  .help = "List the sockets associated with this network thread.",
2114  .read_only = true
2115  },
2116 
2118 };
static int const char char buffer[256]
Definition: acutest.h:574
fr_io_close_t close
Close the transport.
Definition: app_io.h:60
fr_io_data_read_t read
Read from a socket to a data buffer.
Definition: app_io.h:47
module_t common
Common fields to all loadable modules.
Definition: app_io.h:34
fr_io_signal_t error
There was an error on the socket.
Definition: app_io.h:59
fr_app_event_list_set_t event_list_set
Called by the network thread to pass an event list for use by the app_io_t.
Definition: app_io.h:36
fr_io_data_inject_t inject
Inject a packet into a socket.
Definition: app_io.h:50
fr_io_data_vnode_t vnode
Handle notifications that the VNODE has changed.
Definition: app_io.h:52
fr_io_data_write_t write
Write from a data buffer to a socket.
Definition: app_io.h:48
fr_io_name_t get_name
get the socket name
Definition: app_io.h:70
Public structure describing an I/O path for a protocol.
Definition: app_io.h:33
fr_app_priority_get_t priority
Assign a priority to the packet.
Definition: application.h:90
#define fr_atexit_thread_local(_name, _free, _uctx)
Definition: atexit.h:221
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Definition: atomic_queue.c:80
Structure to hold the atomic queue.
Definition: atomic_queue.c:54
#define RCSID(id)
Definition: build.h:444
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition: build.h:110
#define UNUSED
Definition: build.h:313
char const * cf_section_name2(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
Definition: cf_util.c:1126
void * fr_channel_requestor_uctx_get(fr_channel_t *ch)
Get network-specific data from a channel.
Definition: channel.c:922
fr_table_num_sorted_t const channel_signals[]
Definition: channel.c:153
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
Definition: channel.c:408
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
Definition: channel.c:824
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
Definition: channel.c:306
int fr_channel_set_recv_reply(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_reply)
Definition: channel.c:930
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition: channel.c:685
void fr_channel_requestor_uctx_add(fr_channel_t *ch, void *uctx)
Add network-specific data to a channel.
Definition: channel.c:910
void fr_channel_stats_log(fr_channel_t const *ch, fr_log_t const *log, char const *file, int line)
Definition: channel.c:963
A full channel, which consists of two ends.
Definition: channel.c:144
fr_message_t m
the message header
Definition: channel.h:105
fr_channel_event_t
Definition: channel.h:67
@ FR_CHANNEL_NOOP
Definition: channel.h:74
@ FR_CHANNEL_EMPTY
Definition: channel.h:75
@ FR_CHANNEL_CLOSE
Definition: channel.h:72
@ FR_CHANNEL_ERROR
Definition: channel.h:68
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition: channel.h:70
@ FR_CHANNEL_OPEN
Definition: channel.h:71
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition: channel.h:69
void * packet_ctx
Packet specific context for holding client information, and other proto_* specific information that n...
Definition: channel.h:142
fr_listen_t * listen
for tracking packet transport, etc.
Definition: channel.h:146
#define PRIORITY_NORMAL
Definition: channel.h:151
#define PRIORITY_NOW
Definition: channel.h:149
uint32_t priority
Priority of this packet.
Definition: channel.h:140
Channel information which is added to a message.
Definition: channel.h:104
char const * parent
e.g. "show module"
Definition: command.h:52
#define CMD_TABLE_END
Definition: command.h:62
char const ** argv
text version of commands
Definition: command.h:42
#define FR_CONTROL_ID_INJECT
Definition: control.h:60
#define FR_CONTROL_ID_DIRECTORY
Definition: control.h:59
#define FR_CONTROL_ID_CHANNEL
Definition: control.h:56
#define FR_CONTROL_ID_LISTEN
Definition: control.h:57
#define FR_CONTROL_ID_WORKER
Definition: control.h:58
#define FR_CONTROL_MAX_SIZE
Definition: control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition: control.h:50
static fr_ring_buffer_t * rb
Definition: control_test.c:51
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:137
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:154
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition: debug.h:182
#define ERROR(fmt,...)
Definition: dhcpclient.c:41
static int sockfd
Definition: dhcpclient.c:56
#define fr_event_fd_insert(...)
Definition: event.h:232
fr_event_filter_t
The type of filter to install for an FD.
Definition: event.h:61
@ FR_EVENT_FILTER_VNODE
Filter for vnode subfilters.
Definition: event.h:63
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition: event.h:62
#define fr_event_filter_update(...)
Definition: event.h:224
#define fr_event_filter_insert(...)
Definition: event.h:219
#define FR_EVENT_RESUME(_s, _f)
Re-add the filter for a func from kevent.
Definition: event.h:110
#define FR_EVENT_SUSPEND(_s, _f)
Temporarily remove the filter for a func from kevent.
Definition: event.h:94
fr_event_fd_cb_t extend
Additional files were added to a directory.
Definition: event.h:183
Callbacks for the FR_EVENT_FILTER_IO filter.
Definition: event.h:173
Structure describing a modification to a filter's state.
Definition: event.h:75
Callbacks for the FR_EVENT_FILTER_VNODE filter.
Definition: event.h:180
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
Definition: heap.c:322
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition: heap.c:146
unsigned int fr_heap_index_t
Definition: heap.h:80
#define fr_heap_alloc(_ctx, _cmp, _type, _field, _init)
Creates a heap that can be used with non-talloced elements.
Definition: heap.h:100
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition: heap.h:179
#define FR_HEAP_INDEX_INVALID
Definition: heap.h:83
The main heap structure.
Definition: heap.h:66
uint64_t out
Definition: base.h:43
uint64_t dup
Definition: base.h:44
uint64_t dropped
Definition: base.h:45
uint64_t in
Definition: base.h:42
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition: control.c:149
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition: control.c:417
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition: control.c:343
The control structure.
Definition: control.c:79
size_t num_messages
for the message ring buffer
Definition: listen.h:52
bool non_socket_listener
special internal listener that does not use sockets.
Definition: listen.h:45
char const * name
printable name for this socket - set by open
Definition: listen.h:29
void const * app_instance
Definition: listen.h:38
size_t default_message_size
copied from app_io, but may be changed
Definition: listen.h:51
fr_app_t const * app
Definition: listen.h:37
CONF_SECTION * server_cs
CONF_SECTION of the server.
Definition: listen.h:40
bool no_write_callback
sometimes we don't need to do writes
Definition: listen.h:44
int fd
file descriptor for this socket - set by open
Definition: listen.h:28
bool needs_full_setup
Set to true to avoid the short cut when adding the listener.
Definition: listen.h:46
fr_app_io_t const * app_io
I/O path functions.
Definition: listen.h:31
fr_ring_buffer_t * rb
ring buffer for my control-plane messages
Definition: network.c:126
fr_cmd_table_t cmd_network_table[]
Definition: network.c:2073
size_t fr_network_listen_outstanding(fr_network_t *nr, fr_listen_t *li)
Get the number of outstanding packets.
Definition: network.c:792
size_t written
however much we did in a partial write
Definition: network.c:92
int fr_network_listen_send_packet(fr_network_t *nr, fr_listen_t *parent, fr_listen_t *li, const uint8_t *buffer, size_t buflen, fr_time_t recv_time, void *packet_ctx)
Send a packet to the worker.
Definition: network.c:750
fr_atomic_queue_t * aq_control
atomic queue for control messages sent to me
Definition: network.c:122
static int cmd_stats_socket(FILE *fp, FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
Definition: network.c:2053
int fr_network_listen_add(fr_network_t *nr, fr_listen_t *li)
Add a fr_listen_t to a network.
Definition: network.c:236
bool suspended
whether or not we're suspended.
Definition: network.c:117
int fr_network_worker_add(fr_network_t *nr, fr_worker_t *worker)
Add a worker to a network.
Definition: network.c:293
int fr_network_destroy(fr_network_t *nr)
Stop a network thread in an orderly way.
Definition: network.c:1664
fr_network_t * nr
O(N) issues in talloc.
Definition: network.c:77
fr_io_stats_t stats
Definition: network.c:70
fr_listen_t * listen
Definition: network.c:52
static void fr_network_listen_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a new listener.
Definition: network.c:1293
uint8_t * packet
Definition: network.c:53
static int cmd_stats_self(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
Definition: network.c:2020
fr_log_t const * log
log destination
Definition: network.c:119
int fr_network_directory_add(fr_network_t *nr, fr_listen_t *li)
Add a "watch directory" call to a network.
Definition: network.c:278
static int _fr_network_free(fr_network_t *nr)
Free any resources associated with a network thread.
Definition: network.c:1845
static void fr_network_inject_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a packet sent to a socket.
Definition: network.c:1538
fr_heap_index_t heap_id
for the sockets_by_num heap
Definition: network.c:79
#define RTT(_old, _new)
Definition: network.c:465
fr_network_t * fr_network_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_network_config_t const *config)
Create a network.
Definition: network.c:1865
void fr_network(fr_network_t *nr)
The main network worker function.
Definition: network.c:1777
fr_message_set_t * ms
message buffers for this socket.
Definition: network.c:89
int fr_network_listen_delete(fr_network_t *nr, fr_listen_t *li)
Delete a socket from a network.
Definition: network.c:259
int num_blocked
number of blocked workers
Definition: network.c:138
char const * name
Network ID for logging.
Definition: network.c:113
unsigned int outstanding
number of outstanding packets sent to the worker
Definition: network.c:86
static _Thread_local fr_ring_buffer_t * fr_network_rb
Definition: network.c:49
int number
unique ID
Definition: network.c:78
static fr_event_update_t const resume_write[]
Definition: network.c:1112
fr_time_delta_t predicted
predicted processing time for one packet
Definition: network.c:64
static int fr_network_pre_event(fr_time_t now, fr_time_delta_t wake, void *uctx)
fr_worker_t * worker
worker pointer
Definition: network.c:69
int fr_network_sendto_worker(fr_network_t *nr, fr_listen_t *li, void *packet_ctx, uint8_t const *data, size_t data_len, fr_time_t recv_time)
Definition: network.c:1017
int fr_network_exit(fr_network_t *nr)
Signal a network thread to exit.
Definition: network.c:1832
#define MAX_WORKERS
Definition: network.c:47
int fr_network_listen_inject(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, fr_time_t recv_time)
Inject a packet for a listener to read.
Definition: network.c:385
fr_listen_t * listen
I/O ctx and functions.
Definition: network.c:87
int num_sockets
actually a counter...
Definition: network.c:141
fr_rb_node_t listen_node
rbtree node for looking up by listener.
Definition: network.c:74
static void fr_network_vnode_extend(UNUSED fr_event_list_t *el, int sockfd, int fflags, void *ctx)
Get a notification that a vnode changed.
Definition: network.c:1062
static void _signal_pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Read handler for signal pipe.
Definition: network.c:1747
#define OUTSTANDING(_x)
Definition: network.c:582
static int8_t reply_cmp(void const *one, void const *two)
Definition: network.c:157
int num_workers
number of active workers
Definition: network.c:137
static int8_t socket_num_cmp(void const *one, void const *two)
Definition: network.c:186
int num_pending_workers
number of workers we're waiting to start.
Definition: network.c:139
fr_rb_tree_t * sockets
list of sockets we're managing, ordered by the listener
Definition: network.c:134
pthread_t thread_id
for self
Definition: network.c:115
fr_log_lvl_t lvl
debug log level
Definition: network.c:120
int signal_pipe[2]
Pipe for signalling the worker in an orderly way.
Definition: network.c:143
fr_channel_data_t * pending
the currently pending partial packet
Definition: network.c:94
static int8_t socket_listen_cmp(void const *one, void const *two)
Definition: network.c:179
static void fr_network_write(UNUSED fr_event_list_t *el, UNUSED int sockfd, UNUSED int flags, void *ctx)
Write packets to the network.
Definition: network.c:1125
fr_event_list_t * el
our event list
Definition: network.c:128
fr_heap_t * replies
replies from the worker, ordered by priority / origin time
Definition: network.c:130
static int fr_network_send_request(fr_network_t *nr, fr_channel_data_t *cd)
Send a message on the "best" channel.
Definition: network.c:589
void fr_network_stats_log(fr_network_t const *nr, fr_log_t const *log)
Definition: network.c:2006
fr_heap_t * waiting
packets waiting to be written
Definition: network.c:95
int fr_network_stats(fr_network_t const *nr, int num, uint64_t *stats)
Definition: network.c:1990
fr_heap_index_t heap_id
workers are in a heap
Definition: network.c:62
bool blocked
is this worker blocked?
Definition: network.c:66
static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx)
Read a packet from the network.
Definition: network.c:849
static bool is_network_thread(fr_network_t const *nr)
Definition: network.c:224
fr_rb_node_t num_node
rbtree node for looking up by number.
Definition: network.c:75
static void fr_network_error(UNUSED fr_event_list_t *el, UNUSED int sockfd, int flags, int fd_errno, void *ctx)
Handle errors for a socket.
Definition: network.c:1087
fr_io_stats_t stats
Definition: network.c:96
fr_channel_data_t * cd
cached in case of allocation & read error
Definition: network.c:90
static int fr_network_listen_add_self(fr_network_t *nr, fr_listen_t *listen)
Definition: network.c:1307
static void fr_network_suspend(fr_network_t *nr)
Definition: network.c:426
bool dead
is it dead?
Definition: network.c:83
size_t leftover
leftover data from a previous read
Definition: network.c:91
static void fr_network_post_event(fr_event_list_t *el, fr_time_t now, void *uctx)
fr_network_worker_t * workers[MAX_WORKERS]
each worker
Definition: network.c:149
fr_time_t recv_time
Definition: network.c:55
static void fr_network_unsuspend(fr_network_t *nr)
Definition: network.c:445
fr_rb_tree_t * sockets_by_num
ordered by number;
Definition: network.c:135
fr_network_config_t config
configuration
Definition: network.c:148
void fr_network_listen_read(fr_network_t *nr, fr_listen_t *li)
Signal the network to read from a listener.
Definition: network.c:311
static int8_t waiting_cmp(void const *one, void const *two)
Definition: network.c:168
fr_io_stats_t stats
Definition: network.c:132
size_t packet_len
Definition: network.c:54
static fr_ring_buffer_t * fr_network_rb_init(void)
Initialise thread local storage.
Definition: network.c:206
static int _fr_network_rb_free(void *arg)
Definition: network.c:197
static void fr_network_recv_reply(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Callback which handles a message being received on the network side.
Definition: network.c:473
int max_workers
maximum number of allowed workers
Definition: network.c:140
void fr_network_listen_write(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, void *packet_ctx, fr_time_t request_time)
Inject a packet for a listener to write.
Definition: network.c:337
bool exiting
are we exiting?
Definition: network.c:146
fr_event_filter_t filter
what type of filter it is
Definition: network.c:81
static void fr_network_socket_dead(fr_network_t *nr, fr_network_socket_t *s)
Definition: network.c:808
static void fr_network_directory_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a new "watch directory".
Definition: network.c:1419
fr_channel_t * channel
channel to the worker
Definition: network.c:68
static fr_event_update_t const pause_write[]
Definition: network.c:1107
static int _network_socket_free(fr_network_socket_t *s)
Definition: network.c:1250
static void fr_network_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
Handle a network control message callback for a channel.
Definition: network.c:521
static int cmd_socket_list(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
Definition: network.c:2033
static void fr_network_worker_started_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a new worker.
Definition: network.c:1485
fr_time_delta_t cpu_time
how much CPU time this worker has spent
Definition: network.c:63
fr_control_t * control
the control plane
Definition: network.c:124
bool blocked
is it blocked?
Definition: network.c:84
Associate a worker thread with a network thread.
Definition: network.c:61
uint32_t max_outstanding
Definition: network.h:46
#define PERROR(_fmt,...)
Definition: log.h:228
#define DEBUG3(_fmt,...)
Definition: log.h:266
#define DEBUG4(_fmt,...)
Definition: log.h:267
#define RATE_LIMIT_GLOBAL(_log, _fmt,...)
Rate limit messages using a global limiting entry.
Definition: log.h:641
int fr_event_post_delete(fr_event_list_t *el, fr_event_timer_cb_t callback, void *uctx)
Delete a post-event callback from the event list.
Definition: event.c:2328
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition: event.c:2542
int fr_event_pre_delete(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Delete a pre-event callback from the event list.
Definition: event.c:2274
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition: event.c:2407
talloc_free(reap)
int fr_event_post_insert(fr_event_list_t *el, fr_event_timer_cb_t callback, void *uctx)
Add a post-event callback to the event list.
Definition: event.c:2306
int fr_event_pre_insert(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Add a pre-event callback to the event list.
Definition: event.c:2252
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition: event.c:1253
Stores all information relating to an event list.
Definition: event.c:411
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition: log.c:599
fr_log_lvl_t
Definition: log.h:67
@ L_DBG
Only displayed when debugging is enabled.
Definition: log.h:59
static fr_event_update_t pause_read[]
Definition: master.c:154
static fr_event_update_t resume_read[]
Definition: master.c:159
unsigned int uint32_t
Definition: merged_model.c:33
long int ssize_t
Definition: merged_model.c:24
unsigned char uint8_t
Definition: merged_model.c:30
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
Definition: message.c:127
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition: message.c:190
fr_message_t * fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
Localize a message by copying it to local storage.
Definition: message.c:242
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition: message.c:934
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition: message.c:988
fr_message_t * fr_message_alloc_reserve(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size, size_t leftover, size_t reserve_size)
Allocate packet data for a message, and reserve a new message.
Definition: message.c:1077
A Message set, composed of message headers and ring buffer data.
Definition: message.c:95
size_t rb_size
cache-aligned size in the ring buffer
Definition: message.h:51
fr_time_t when
when this message was sent
Definition: message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition: message.h:49
size_t data_size
size of the data in the ring buffer
Definition: message.h:50
@ FR_MESSAGE_USED
Definition: message.h:39
@ FR_MESSAGE_LOCALIZED
Definition: message.h:40
fr_message_status_t status
free, used, done, etc.
Definition: message.h:45
int fr_nonblock(UNUSED int fd)
Definition: misc.c:284
static const conf_parser_t config[]
Definition: base.c:188
static rc_stats_t stats
Definition: radclient-ng.c:72
#define DEBUG2(fmt,...)
Definition: radclient.h:43
static fr_app_io_t app_io
Definition: radius1_test.c:146
uint32_t fr_rand(void)
Return a 32-bit random number.
Definition: rand.c:106
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
Return how many nodes there are in a tree.
Definition: rb.c:775
void * fr_rb_iter_next_inorder(fr_rb_iter_inorder_t *iter)
Return the next node.
Definition: rb.c:844
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition: rb.c:624
bool fr_rb_delete(fr_rb_tree_t *tree, void const *data)
Remove node and free data (if a free function was specified)
Definition: rb.c:736
void * fr_rb_iter_init_inorder(fr_rb_iter_inorder_t *iter, fr_rb_tree_t *tree)
Initialise an in-order iterator.
Definition: rb.c:818
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition: rb.c:576
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
Definition: rb.h:246
int fr_rb_flatten_inorder(TALLOC_CTX *ctx, void **out[], fr_rb_tree_t *tree)
Iterator structure for in-order traversal of an rbtree.
Definition: rb.h:321
The main red black tree structure.
Definition: rb.h:73
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition: ring_buffer.c:64
static char const * name
static char buff[sizeof("18446744073709551615")+3]
Definition: size_tests.c:41
fr_assert(0)
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
Definition: log.h:96
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition: syserror.c:243
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition: table.h:253
#define talloc_get_type_abort_const
Definition: talloc.h:270
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition: time.h:573
static fr_time_delta_t fr_time_delta_add(fr_time_delta_t a, fr_time_delta_t b)
Definition: time.h:255
#define fr_time_delta_lt(_a, _b)
Definition: time.h:283
#define fr_time_delta_ispos(_a)
Definition: time.h:288
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition: time.h:914
A time delta, a difference in time measured in nanoseconds.
Definition: time.h:80
"server local" time.
Definition: time.h:69
close(uq->fd)
static fr_event_list_t * el
static fr_slen_t parent
Definition: pair.h:844
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
Definition: strerror.c:733
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition: strerror.h:64
#define fr_strerror_const_push(_msg)
Definition: strerror.h:227
#define fr_strerror_const(_msg)
Definition: strerror.h:223
static fr_slen_t data
Definition: value.h:1259
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
Definition: worker.c:1602
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
Definition: worker.c:1629
A worker which takes packets from a master, and processes them.
Definition: worker.c:94