The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
network.c
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or
5  * (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: 9c7f4c54b722cd39885a1ac1a8a7e1684ed4262b $
19  *
20  * @brief Receiver of socket data, which sends messages to the workers.
21  * @file io/network.c
22  *
23  * @copyright 2016 Alan DeKok (aland@freeradius.org)
24  */
25 RCSID("$Id: 9c7f4c54b722cd39885a1ac1a8a7e1684ed4262b $")
26 
27 #define LOG_PREFIX nr->name
28 
29 #define LOG_DST nr->log
30 
31 #include <freeradius-devel/util/event.h>
32 #include <freeradius-devel/util/misc.h>
33 #include <freeradius-devel/util/rand.h>
34 #include <freeradius-devel/util/rb.h>
35 #include <freeradius-devel/util/syserror.h>
36 #include <freeradius-devel/util/atexit.h>
37 #include <freeradius-devel/util/talloc.h>
38 
39 #include <freeradius-devel/io/channel.h>
40 #include <freeradius-devel/io/control.h>
41 #include <freeradius-devel/io/listen.h>
42 #include <freeradius-devel/io/network.h>
43 #include <freeradius-devel/io/queue.h>
44 #include <freeradius-devel/io/ring_buffer.h>
45 #include <freeradius-devel/io/worker.h>
46 
47 #define MAX_WORKERS 64
48 
49 static _Thread_local fr_ring_buffer_t *fr_network_rb;
50 
51 typedef struct {
54  size_t packet_len;
57 
58 /** Associate a worker thread with a network thread
59  *
60  */
61 typedef struct {
62  fr_heap_index_t heap_id; //!< workers are in a heap
63  fr_time_delta_t cpu_time; //!< how much CPU time this worker has spent
64  fr_time_delta_t predicted; //!< predicted processing time for one packet
65 
66  bool blocked; //!< is this worker blocked?
67 
68  fr_channel_t *channel; //!< channel to the worker
69  fr_worker_t *worker; //!< worker pointer
72 
73 typedef struct {
74  fr_rb_node_t listen_node; //!< rbtree node for looking up by listener.
75  fr_rb_node_t num_node; //!< rbtree node for looking up by number.
76 
77  fr_network_t *nr; //!< O(N) issues in talloc
78  int number; //!< unique ID
79  fr_heap_index_t heap_id; //!< for the sockets_by_num heap
80 
81  fr_event_filter_t filter; //!< what type of filter it is
82 
83  bool dead; //!< is it dead?
84  bool blocked; //!< is it blocked?
85 
86  unsigned int outstanding; //!< number of outstanding packets sent to the worker
87  fr_listen_t *listen; //!< I/O ctx and functions.
88 
89  fr_message_set_t *ms; //!< message buffers for this socket.
90  fr_channel_data_t *cd; //!< cached in case of allocation & read error
91  size_t leftover; //!< leftover data from a previous read
92  size_t written; //!< however much we did in a partial write
93 
94  fr_channel_data_t *pending; //!< the currently pending partial packet
95  fr_heap_t *waiting; //!< packets waiting to be written
98 
99 /*
100  * We have an array of workers, so we can index the workers in
101  * O(1) time. remove the heap of "workers ordered by CPU time"
102  * when we send a packet to a worker, just update the predicted
103  * CPU time in place. when we receive a reply from a worker,
104  * just update the predicted CPU time in place.
105  *
106  * when we need to choose a worker, pick 2 at random, and then
107  * choose the one with the lowe cpu time. For background, see
108  * "Power of Two-Choices" and
109  * https://www.eecs.harvard.edu/~michaelm/postscripts/mythesis.pdf
110  * https://www.eecs.harvard.edu/~michaelm/postscripts/tpds2001.pdf
111  */
112 struct fr_network_s {
113  char const *name; //!< Network ID for logging.
114 
115  pthread_t thread_id; //!< for self
116 
117  bool suspended; //!< whether or not we're suspended.
118 
119  fr_log_t const *log; //!< log destination
120  fr_log_lvl_t lvl; //!< debug log level
121 
122  fr_atomic_queue_t *aq_control; //!< atomic queue for control messages sent to me
123 
124  fr_control_t *control; //!< the control plane
125 
126  fr_ring_buffer_t *rb; //!< ring buffer for my control-plane messages
127 
128  fr_event_list_t *el; //!< our event list
129 
130  fr_heap_t *replies; //!< replies from the worker, ordered by priority / origin time
131 
133 
134  fr_rb_tree_t *sockets; //!< list of sockets we're managing, ordered by the listener
135  fr_rb_tree_t *sockets_by_num; //!< ordered by number;
136 
137  int num_workers; //!< number of active workers
138  int num_blocked; //!< number of blocked workers
139  int num_pending_workers; //!< number of workers we're waiting to start.
140  int max_workers; //!< maximum number of allowed workers
141  int num_sockets; //!< actually a counter...
142 
143  int signal_pipe[2]; //!< Pipe for signalling the worker in an orderly way.
144  ///< This is more deterministic than using async signals.
145 
146  bool exiting; //!< are we exiting?
147 
148  fr_network_config_t config; //!< configuration
150 };
151 
153 static int fr_network_pre_event(fr_time_t now, fr_time_delta_t wake, void *uctx);
155 static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx);
156 
157 static int8_t reply_cmp(void const *one, void const *two)
158 {
159  fr_channel_data_t const *a = one, *b = two;
160  int ret;
161 
162  ret = CMP(a->priority, b->priority);
163  if (ret != 0) return ret;
164 
165  return fr_time_cmp(a->m.when, b->m.when);
166 }
167 
168 static int8_t waiting_cmp(void const *one, void const *two)
169 {
170  fr_channel_data_t const *a = one, *b = two;
171  int ret;
172 
173  ret = CMP(a->priority, b->priority);
174  if (ret != 0) return ret;
175 
176  return fr_time_cmp(a->reply.request_time, b->reply.request_time);
177 }
178 
179 static int8_t socket_listen_cmp(void const *one, void const *two)
180 {
181  fr_network_socket_t const *a = one, *b = two;
182 
183  return CMP(a->listen, b->listen);
184 }
185 
186 static int8_t socket_num_cmp(void const *one, void const *two)
187 {
188  fr_network_socket_t const *a = one, *b = two;
189 
190  return CMP(a->number, b->number);
191 }
192 
193 /*
194  * Explicitly cleanup the memory allocated to the ring buffer,
195  * just in case valgrind complains about it.
196  */
197 static int _fr_network_rb_free(void *arg)
198 {
199  return talloc_free(arg);
200 }
201 
202 /** Initialise thread local storage
203  *
204  * @return fr_ring_buffer_t for messages
205  */
207 {
209 
210  rb = fr_network_rb;
211  if (rb) return rb;
212 
214  if (!rb) {
215  fr_perror("Failed allocating memory for network ring buffer");
216  return NULL;
217  }
218 
220 
221  return rb;
222 }
223 
224 static inline bool is_network_thread(fr_network_t const *nr)
225 {
226  return (pthread_equal(pthread_self(), nr->thread_id) != 0);
227 }
228 
229 static int fr_network_listen_add_self(fr_network_t *nr, fr_listen_t *listen);
230 
231 /** Add a fr_listen_t to a network
232  *
233  * @param nr the network
234  * @param li the listener
235  */
237 {
239 
240  /*
241  * Skip a bunch of work if we're already in the network thread.
242  */
243  if (is_network_thread(nr) && !li->needs_full_setup) {
244  return fr_network_listen_add_self(nr, li);
245  }
246 
248  if (!rb) return -1;
249 
250  return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_LISTEN, &li, sizeof(li));
251 }
252 
253 
254 /** Delete a socket from a network. MUST be called only by the listener itself!.
255  *
256  * @param nr the network
257  * @param li the listener
258  */
260 {
262 
264 
265  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
266  if (!s) return -1;
267 
268  fr_network_socket_dead(nr, s);
269 
270  return 0;
271 }
272 
273 /** Add a "watch directory" call to a network
274  *
275  * @param nr the network
276  * @param li the listener
277  */
279 {
281 
283  if (!rb) return -1;
284 
285  return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_DIRECTORY, &li, sizeof(li));
286 }
287 
288 /** Add a worker to a network in a different thread
289  *
290  * @param nr the network
291  * @param worker the worker
292  */
294 {
296 
298  if (!rb) return -1;
299 
300  (void) talloc_get_type_abort(nr, fr_network_t);
301  (void) talloc_get_type_abort(worker, fr_worker_t);
302 
303  return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_WORKER, &worker, sizeof(worker));
304 }
305 
306 static void fr_network_worker_started_callback(void *ctx, void const *data, size_t data_size, fr_time_t now);
307 
308 /** Add a worker to a network in the same thread
309  *
310  * @param nr the network
311  * @param worker the worker
312  */
314 {
315  fr_network_worker_started_callback(nr, &worker, sizeof(worker), fr_time_wrap(0));
316 }
317 
318 
319 /** Signal the network to read from a listener
320  *
321  * @param nr the network
322  * @param li the listener to read from
323  */
325 {
327 
328  (void) talloc_get_type_abort(nr, fr_network_t);
330 
331  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
332  if (!s) return;
333 
334  /*
335  * Go read the socket.
336  */
337  fr_network_read(nr->el, s->listen->fd, 0, s);
338 }
339 
340 
341 /** Inject a packet for a listener to write
342  *
343  * @param nr the network
344  * @param li the listener where the packet is being injected
345  * @param packet the packet to be written
346  * @param packet_len the length of the packet
347  * @param packet_ctx The packet context to write
348  * @param request_time when the packet was received.
349  */
350 void fr_network_listen_write(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len,
351  void *packet_ctx, fr_time_t request_time)
352 {
353  fr_message_t *lm;
355 
356  cd = (fr_channel_data_t) {
357  .m = (fr_message_t) {
359  .data_size = packet_len,
360  .when = request_time,
361  },
362 
363  .channel = {
364  .heap_id = FR_HEAP_INDEX_INVALID,
365  },
366 
367  .listen = li,
368  .priority = PRIORITY_NOW,
369  .reply.request_time = request_time,
370  };
371 
372  memcpy(&cd.m.data, &packet, sizeof(packet)); /* const issues */
373  memcpy(&cd.packet_ctx, &packet_ctx, sizeof(packet_ctx)); /* const issues */
374 
375  /*
376  * Localize the message and insert it into the heap of pending messages.
377  */
378  lm = fr_message_localize(nr, &cd.m, sizeof(cd));
379  if (!lm) return;
380 
381  if (fr_heap_insert(&nr->replies, lm) < 0) {
382  fr_message_done(lm);
383  }
384 }
385 
386 
387 /** Inject a packet for a listener to read
388  *
389  * @param nr the network
390  * @param li the listener where the packet is being injected
391  * @param packet the packet to be injected
392  * @param packet_len the length of the packet
393  * @param recv_time when the packet was received.
394  * @return
395  * - <0 on error
396  * - 0 on success
397  */
398 int fr_network_listen_inject(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, fr_time_t recv_time)
399 {
401  fr_network_inject_t my_inject;
402 
403  /*
404  * Can't inject to injection-less destinations.
405  */
406  if (!li->app_io->inject) return -1;
407 
408  /*
409  * Avoid a bounce through the event loop if we're being called from the network thread.
410  */
411  if (is_network_thread(nr)) {
413 
414  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
415  if (!s) return -1;
416 
417  /*
418  * Inject the packet. The master.c mod_read() routine will then take care of avoiding
419  * IO, and instead return the packet to the network side.
420  */
421  if (li->app_io->inject(li, packet, packet_len, recv_time) == 0) {
422  fr_network_read(nr->el, li->fd, 0, s);
423  }
424 
425  return 0;
426  }
427 
429  if (!rb) return -1;
430 
431  my_inject.listen = li;
432  my_inject.packet = talloc_memdup(NULL, packet, packet_len);
433  my_inject.packet_len = packet_len;
434  my_inject.recv_time = recv_time;
435 
436  return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_INJECT, &my_inject, sizeof(my_inject));
437 }
438 
440 {
441  static fr_event_update_t pause_read[] = {
443  { 0 }
444  };
447 
448  if (nr->suspended) return;
449 
450  for (s = fr_rb_iter_init_inorder(&iter, nr->sockets);
451  s != NULL;
454  }
455  nr->suspended = true;
456 }
457 
459 {
460  static fr_event_update_t resume_read[] = {
462  { 0 }
463  };
466 
467  if (!nr->suspended) return;
468 
469  for (s = fr_rb_iter_init_inorder(&iter, nr->sockets);
470  s != NULL;
473  }
474  nr->suspended = false;
475 }
476 
477 #define IALPHA (8)
478 #define RTT(_old, _new) fr_time_delta_wrap((fr_time_delta_unwrap(_new) + (fr_time_delta_unwrap(_old) * (IALPHA - 1))) / IALPHA)
479 
480 /** Callback which handles a message being received on the network side.
481  *
482  * @param[in] ctx the network
483  * @param[in] ch the channel that the message is on.
484  * @param[in] cd the message (if any) to start with
485  */
486 static void fr_network_recv_reply(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
487 {
488  fr_network_t *nr = ctx;
489  fr_network_worker_t *worker;
490 
491  cd->channel.ch = ch;
492 
493  /*
494  * Update stats for the worker.
495  */
496  worker = fr_channel_requestor_uctx_get(ch);
497  worker->stats.out++;
498  worker->cpu_time = cd->reply.cpu_time;
499  if (!fr_time_delta_ispos(worker->predicted)) {
500  worker->predicted = cd->reply.processing_time;
501  } else {
502  worker->predicted = RTT(worker->predicted, cd->reply.processing_time);
503  }
504 
505  /*
506  * Unblock the worker.
507  */
508  if (worker->blocked) {
509  worker->blocked = false;
510  nr->num_blocked--;
512  }
513 
514  /*
515  * Ensure that heap insert works.
516  */
517  cd->channel.heap_id = FR_HEAP_INDEX_INVALID;
518  if (fr_heap_insert(&nr->replies, cd) < 0) {
519  fr_message_done(&cd->m);
520  fr_assert(0 == 1);
521  }
522 }
523 
524 /** Handle a network control message callback for a channel
525  *
526  * This is called from the event loop when we get a notification
527  * from the event signalling pipe.
528  *
529  * @param[in] ctx the network
530  * @param[in] data the message
531  * @param[in] data_size size of the data
532  * @param[in] now the current time
533  */
534 static void fr_network_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
535 {
537  fr_channel_t *ch;
538  fr_network_t *nr = ctx;
539 
540  ce = fr_channel_service_message(now, &ch, data, data_size);
541  DEBUG3("Channel %s",
542  fr_table_str_by_value(channel_signals, ce, "<INVALID>"));
543  switch (ce) {
544  case FR_CHANNEL_ERROR:
545  return;
546 
547  case FR_CHANNEL_EMPTY:
548  return;
549 
550  case FR_CHANNEL_NOOP:
551  break;
552 
554  fr_assert(ch != NULL);
555  while (fr_channel_recv_reply(ch));
556  break;
557 
559  fr_assert(0 == 1);
560  break;
561 
562  case FR_CHANNEL_OPEN:
563  fr_assert(0 == 1);
564  break;
565 
566  case FR_CHANNEL_CLOSE:
567  {
568  fr_network_worker_t *w = talloc_get_type_abort(fr_channel_requestor_uctx_get(ch),
570  int i;
571 
572  /*
573  * Remove this worker from the array
574  */
575  for (i = 0; i < nr->num_workers; i++) {
576  DEBUG3("Worker acked our close request");
577  if (nr->workers[i] == w) {
578  nr->workers[i] = NULL;
579 
580  if (i == (nr->num_workers - 1)) break;
581 
582  /*
583  * Close the hole...
584  */
585  memcpy(&nr->workers[i], &nr->workers[i + 1], ((nr->num_workers - i) - 1));
586  break;
587  }
588  }
589  nr->num_workers--;
590  }
591  break;
592  }
593 }
594 
595 #define OUTSTANDING(_x) ((_x)->stats.in - (_x)->stats.out)
596 
597 /** Send a message on the "best" channel.
598  *
599  * @param nr the network
600  * @param cd the message we've received
601  */
603 {
604  fr_network_worker_t *worker;
605 
606  (void) talloc_get_type_abort(nr, fr_network_t);
607 
608 retry:
609  if (nr->num_workers == 1) {
610  worker = nr->workers[0];
611  if (worker->blocked) {
612  RATE_LIMIT_GLOBAL(ERROR, "Failed sending packet to worker - "
613  "In single-threaded mode and worker is blocked");
614  drop:
615  worker->stats.dropped++;
616  return -1;
617  }
618 
619  } else if (nr->num_blocked == 0) {
620  int64_t cmp;
621  uint32_t one, two;
622 
623  one = fr_rand() % nr->num_workers;
624  do {
625  two = fr_rand() % nr->num_workers;
626  } while (two == one);
627 
628  /*
629  * Choose a worker based on minimizing the amount
630  * of future work it's being asked to do.
631  *
632  * If both workers have the same number of
633  * outstanding requests, then choose the worker
634  * which has used the least total CPU time.
635  */
636  cmp = (OUTSTANDING(nr->workers[one]) - OUTSTANDING(nr->workers[two]));
637  if (cmp < 0) {
638  worker = nr->workers[one];
639 
640  } else if (cmp > 0) {
641  worker = nr->workers[two];
642 
643  } else if (fr_time_delta_lt(nr->workers[one]->cpu_time, nr->workers[two]->cpu_time)) {
644  worker = nr->workers[one];
645 
646  } else {
647  worker = nr->workers[two];
648  }
649  } else {
650  int i;
651  uint64_t min_outstanding = UINT64_MAX;
652  fr_network_worker_t *found = NULL;
653 
654  /*
655  * Some workers are blocked. Pick the worker
656  * with the least amount of future work to do.
657  */
658  for (i = 0; i < nr->num_workers; i++) {
659  uint64_t outstanding;
660 
661  worker = nr->workers[i];
662  if (worker->blocked) continue;
663 
664  outstanding = OUTSTANDING(worker);
665  if ((outstanding < min_outstanding) || !found) {
666  found = worker;
667  min_outstanding = outstanding;
668 
669  } else if (outstanding == min_outstanding) {
670  /*
671  * Queue lengths are the same.
672  * Choose this worker if it's
673  * less busy than the previous one we found.
674  */
675  if (fr_time_delta_lt(worker->cpu_time, found->cpu_time)) {
676  found = worker;
677  }
678  }
679  }
680 
681  if (!found) {
682  RATE_LIMIT_GLOBAL(PERROR, "Failed sending packet to worker - Couldn't find active worker, "
683  "%u/%u workers are blocked", nr->num_blocked, nr->num_workers);
684  return -1;
685  }
686 
687  worker = found;
688  }
689 
690  (void) talloc_get_type_abort(worker, fr_network_worker_t);
691 
692  /*
693  * Too many outstanding packets for this worker. Drop
694  * the request.
695  *
696  * If the worker we've picked has too many outstanding
697  * packets, then we have either only one worker, in which
698  * cae we should drop the packet. Or, we were unable to
699  * find a worker with smaller than max_outstanding
700  * packets. In which case all of the workers are likely
701  * at max_outstanding.
702  *
703  * In both cases, we should just drop the new packet.
704  */
705  fr_assert(worker->stats.in >= worker->stats.out);
706  if (nr->config.max_outstanding &&
707  (OUTSTANDING(worker) >= nr->config.max_outstanding)) {
708  RATE_LIMIT_GLOBAL(PERROR, "max_outstanding reached - dropping packet");
709  goto drop;
710  }
711 
712  /*
713  * Send the message to the channel. If we fail, drop the
714  * packet. The only reason for failure is that the
715  * worker isn't servicing it's input queue. When that
716  * happens, we have no idea what to do, and the whole
717  * thing falls over.
718  */
719  if (fr_channel_send_request(worker->channel, cd) < 0) {
720  worker->stats.dropped++;
721  worker->blocked = true;
722  nr->num_blocked++;
723 
724  RATE_LIMIT_GLOBAL(PERROR, "Failed sending packet to worker - %u/%u workers are blocked",
725  nr->num_blocked, nr->num_workers);
726 
727  if (nr->num_blocked == nr->num_workers) {
728  fr_network_suspend(nr);
729  return -1;
730  }
731  goto retry;
732  }
733 
734  worker->stats.in++;
735 
736  /*
737  * We're projecting that the worker will use more CPU
738  * time to process this request. The CPU time will be
739  * updated with a more accurate number when we receive a
740  * reply from this channel.
741  */
742  worker->cpu_time = fr_time_delta_add(worker->cpu_time, worker->predicted);
743 
744  return 0;
745 }
746 
747 
748 /** Send a packet to the worker.
749  *
750  * MUST only be called from the network thread.
751  *
752  * @param nr the network
753  * @param parent the parent listener
754  * @param li the listener that the packet was "read" from. Can be "parent"
755  * @param buffer the packet to send
756  * @param buflen size of the packet to send
757  * @param recv_time of the packet
758  * @param packet_ctx for the packet
759  * @return
760  * - <0 on error
761  * - 0 on success
762  */
764  const uint8_t *buffer, size_t buflen, fr_time_t recv_time, void *packet_ctx)
765 {
766  fr_channel_data_t *cd;
768 
769  (void) talloc_get_type_abort(nr, fr_network_t);
771 
772  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
773  if (!s) return -1;
774 
775  cd = (fr_channel_data_t *) fr_message_alloc(s->ms, NULL, buflen);
776  if (!cd) return -1;
777 
778  cd->listen = parent;
780  cd->packet_ctx = packet_ctx;
781  cd->request.recv_time = recv_time;
782  memcpy(cd->m.data, buffer, buflen);
783  cd->m.when = fr_time();
784 
785  if (fr_network_send_request(nr, cd) < 0) {
786  talloc_free(cd->packet_ctx);
787  fr_message_done(&cd->m);
788  nr->stats.dropped++;
789  s->stats.dropped++;
790  return -1;
791  }
792 
793  s->outstanding++;
794  return 0;
795 }
796 
797 /** Get the number of outstanding packets
798  *
799  * @param nr the network
800  * @param li the listener that the packet was "read" from
801  * @return
802  * - <0 on error
803  * - the number of outstanding packets
804 */
807 
808  (void) talloc_get_type_abort(nr, fr_network_t);
810 
811  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
812  if (!s) return -1;
813 
814  return s->outstanding;
815 }
816 
817 /*
818  * Mark it as dead, but DON'T free it until all of the replies
819  * have come in.
820  */
822 {
823  int i;
824 
825  if (s->dead) return;
826 
827  s->dead = true;
828 
829  fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
830 
831 
832  for (i = 0; i < nr->max_workers; i++) {
833  if (!nr->workers[i]) continue;
834 
835  (void) fr_worker_listen_cancel(nr->workers[i]->worker, s->listen);
836  }
837 
838  /*
839  * If there are no outstanding packets, then we can free
840  * it now.
841  */
842  if (!s->outstanding) {
843  talloc_free(s);
844  return;
845  }
846 
847  /*
848  * There are still outstanding packets. Leave it in the
849  * socket tree, so that replies from the worker can find
850  * it. When we've received all of the replies, then
851  * fr_network_post_event() will clean up this socket.
852  */
853 }
854 
855 /** Read a packet from the network.
856  *
857  * @param[in] el the event list.
858  * @param[in] sockfd the socket which is ready to read.
859  * @param[in] flags from kevent.
860  * @param[in] ctx the network socket context.
861  */
862 static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx)
863 {
864  int num_messages = 0;
865  fr_network_socket_t *s = ctx;
866  fr_network_t *nr = s->nr;
867  ssize_t data_size;
868  fr_channel_data_t *cd, *next;
869 
870  if (!fr_cond_assert_msg(s->listen->fd == sockfd, "Expected listen->fd (%u) to be equal event fd (%u)",
871  s->listen->fd, sockfd)) return;
872 
873  DEBUG3("Reading data from FD %u", sockfd);
874 
875  if (!s->cd) {
877  if (!cd) {
878  ERROR("Failed allocating message size %zd! - Closing socket",
880  fr_network_socket_dead(nr, s);
881  return;
882  }
883  } else {
884  cd = s->cd;
885  }
886 
887  fr_assert(cd->m.data != NULL);
888 
889 next_message:
890  /*
891  * Poll this socket, but not too often. We have to go
892  * service other sockets, too.
893  */
894  if (num_messages > 16) {
895  s->cd = cd;
896  return;
897  }
898 
900 
901  /*
902  * Read data from the network.
903  *
904  * Return of 0 means "no data", which is fine for UDP.
905  * For TCP, if an underlying read() on the TCP socket
906  * returns 0, (which signals that the FD is no longer
907  * usable) this function should return -1, so that the
908  * network side knows that it needs to close the
909  * connection.
910  */
911  data_size = s->listen->app_io->read(s->listen, &cd->packet_ctx, &cd->request.recv_time,
912  cd->m.data, cd->m.rb_size, &s->leftover);
913  if (data_size == 0) {
914  /*
915  * Cache the message for later. This is
916  * important for stream sockets, which can do
917  * partial reads into the current buffer. We
918  * need to be able to give the same buffer back
919  * to the stream socket for subsequent reads.
920  *
921  * Since we have a message set for each
922  * fr_io_socket_t, no "head of line"
923  * blocking issues can happen for stream sockets.
924  */
925  s->cd = cd;
926  return;
927  }
928 
929  /*
930  * Error: close the connection, and remove the fr_listen_t
931  */
932  if (data_size < 0) {
933 // fr_log(nr->log, L_DBG_ERR, "error from transport read on socket %d", sockfd);
934  fr_network_socket_dead(nr, s);
935  return;
936  }
937  s->cd = NULL;
938 
939  DEBUG3("Read %zd byte(s) from FD %u", data_size, sockfd);
940  nr->stats.in++;
941  s->stats.in++;
942 
943  /*
944  * Initialize the rest of the fields of the channel data.
945  *
946  * We always use "now" as the time of the message, as the
947  * packet MAY be a duplicate packet magically resurrected
948  * from the past. i.e. If the read routines are doing
949  * dedup, then they notice that the packet is a
950  * duplicate. In that case, they send over a copy of the
951  * packet, BUT with the original timestamp. This
952  * information tells the worker that the packet is a
953  * duplicate.
954  */
955  cd->m.when = fr_time();
956  cd->listen = s->listen;
957 
958  /*
959  * Nothing in the buffer yet. Allocate room for one
960  * packet.
961  */
962  if ((cd->m.data_size == 0) && (!s->leftover)) {
963 
964  (void) fr_message_alloc(s->ms, &cd->m, data_size);
965  next = NULL;
966 
967  } else {
968  /*
969  * There are leftover bytes in the buffer, feed
970  * them to the next round of reading.
971  */
972  next = (fr_channel_data_t *) fr_message_alloc_reserve(s->ms, &cd->m, data_size, s->leftover,
974  if (!next) {
975  PERROR("Failed reserving partial packet.");
976  // @todo - probably close the socket...
977  fr_assert(0 == 1);
978  }
979  }
980 
981  /*
982  * Set the priority. Which incidentally also checks if
983  * we're allowed to read this particular kind of packet.
984  *
985  * That check is because the app_io handlers just read
986  * packets, and don't really have access to the parent
987  * "list of allowed packet types". So we have to do the
988  * work here in a callback.
989  *
990  * That should probably be fixed...
991  */
992  if (s->listen->app->priority) {
993  int priority;
994 
995  priority = s->listen->app->priority(s->listen->app_instance, cd->m.data, data_size);
996  if (priority <= 0) goto discard;
997 
998  cd->priority = priority;
999  }
1000 
1001  if (fr_network_send_request(nr, cd) < 0) {
1002  discard:
1003  talloc_free(cd->packet_ctx); /* not sure what else to do here */
1004  fr_message_done(&cd->m);
1005  nr->stats.dropped++;
1006  s->stats.dropped++;
1007 
1008  } else {
1009  /*
1010  * One more packet sent to a worker.
1011  */
1012  s->outstanding++;
1013  }
1014 
1015  /*
1016  * If there is a next message, go read it from the buffer.
1017  *
1018  * @todo - note that this calls read(), even if the
1019  * app_io has paused the reader. We likely want to be
1020  * able to check that, too. We might just remove this
1021  * "goto"...
1022  */
1023  if (next) {
1024  cd = next;
1025  num_messages++;
1026  goto next_message;
1027  }
1028 }
1029 
1030 int fr_network_sendto_worker(fr_network_t *nr, fr_listen_t *li, void *packet_ctx, uint8_t const *data, size_t data_len, fr_time_t recv_time)
1031 {
1032  fr_channel_data_t *cd;
1034 
1035  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
1036  if (!s) return -1;
1037 
1038  cd = (fr_channel_data_t *) fr_message_alloc(s->ms, NULL, data_len);
1039  if (!cd) return -1;
1040 
1041  s->stats.in++;
1042 
1043  cd->priority = PRIORITY_NORMAL;
1044 
1045  cd->m.when = recv_time;
1046  cd->listen = li;
1047  cd->packet_ctx = packet_ctx;
1048 
1049  memcpy(cd->m.data, data, data_len);
1050 
1051  if (fr_network_send_request(nr, cd) < 0) {
1052  talloc_free(packet_ctx);
1053  fr_message_done(&cd->m);
1054  nr->stats.dropped++;
1055  s->stats.dropped++;
1056 
1057  } else {
1058  /*
1059  * One more packet sent to a worker.
1060  */
1061  s->outstanding++;
1062  }
1063 
1064  return 0;
1065 }
1066 
1067 
1068 /** Get a notification that a vnode changed
1069  *
1070  * @param[in] el the event list.
1071  * @param[in] sockfd the socket which is ready to read.
1072  * @param[in] fflags from kevent.
1073  * @param[in] ctx the network socket context.
1074  */
1075 static void fr_network_vnode_extend(UNUSED fr_event_list_t *el, int sockfd, int fflags, void *ctx)
1076 {
1077  fr_network_socket_t *s = ctx;
1078  fr_network_t *nr = s->nr;
1079 
1080  fr_cond_assert(s->listen->fd == sockfd);
1081 
1082  DEBUG3("network vnode");
1083 
1084  /*
1085  * Tell the IO handler that something has happened to the
1086  * file.
1087  */
1088  s->listen->app_io->vnode(s->listen, fflags);
1089 }
1090 
1091 
1092 /** Handle errors for a socket.
1093  *
1094  * @param[in] el the event list
1095  * @param[in] sockfd the socket which has a fatal error.
1096  * @param[in] flags returned by kevent.
1097  * @param[in] fd_errno returned by kevent.
1098  * @param[in] ctx the network socket context.
1099  */
1101  int fd_errno, void *ctx)
1102 {
1103  fr_network_socket_t *s = ctx;
1104  fr_network_t *nr = s->nr;
1105 
1106  if (s->listen->app_io->error) {
1107  s->listen->app_io->error(s->listen);
1108 
1109  } else if (flags & EV_EOF) {
1110  DEBUG2("Socket %s closed by peer", s->listen->name);
1111 
1112  } else {
1113  ERROR("Socket %s errored - %s", s->listen->name, fr_syserror(fd_errno));
1114  }
1115 
1116  fr_network_socket_dead(s->nr, s);
1117 }
1118 
1119 
1120 static fr_event_update_t const pause_write[] = {
1122  { 0 }
1123 };
1124 
1127  { 0 }
1128 };
1129 
1130 
1131 /** Write packets to the network.
1132  *
1133  * @param el the event list
1134  * @param sockfd the socket which is ready to write
1135  * @param flags returned by kevent.
1136  * @param ctx the network socket context.
1137  */
1138 static void fr_network_write(UNUSED fr_event_list_t *el, UNUSED int sockfd, UNUSED int flags, void *ctx)
1139 {
1140  fr_network_socket_t *s = ctx;
1141  fr_listen_t *li = s->listen;
1142  fr_network_t *nr = s->nr;
1143  fr_channel_data_t *cd;
1144 
1145  (void) talloc_get_type_abort(nr, fr_network_t);
1146 
1147  /*
1148  * Start with the currently pending message, and then
1149  * work through the priority heap.
1150  */
1151  if (s->pending) {
1152  cd = s->pending;
1153  s->pending = NULL;
1154 
1155  } else {
1156  cd = fr_heap_pop(&s->waiting);
1157  }
1158 
1159  while (cd != NULL) {
1160  int rcode;
1161 
1162  fr_assert(li == cd->listen);
1163  rcode = li->app_io->write(li, cd->packet_ctx,
1164  cd->reply.request_time,
1165  cd->m.data, cd->m.data_size, s->written);
1166 
1167  /*
1168  * As a special case, allow write() to return
1169  * "0", which means "close the socket".
1170  */
1171  if (rcode == 0) goto dead;
1172 
1173  /*
1174  * Or we have a write error.
1175  */
1176  if (rcode < 0) {
1177  /*
1178  * Stop processing the heap, and set the
1179  * pending message to the current one.
1180  */
1181  if (errno == EWOULDBLOCK) {
1182  save_pending:
1183  fr_assert(!s->pending);
1184 
1185  if (cd->m.status != FR_MESSAGE_LOCALIZED) {
1186  fr_message_t *lm;
1187 
1188  lm = fr_message_localize(s, &cd->m, sizeof(*cd));
1189  if (!lm) {
1190  ERROR("Failed saving pending packet");
1191  goto dead;
1192  }
1193 
1194  cd = (fr_channel_data_t *) lm;
1195  }
1196 
1197  if (!s->blocked) {
1199  PERROR("Failed adding write callback to event loop");
1200  goto dead;
1201  }
1202 
1203  s->blocked = true;
1204  }
1205 
1206  s->pending = cd;
1207  return;
1208  }
1209 
1210  /*
1211  * As a special hack, check for something
1212  * that will never be returned from a
1213  * real write() routine. Which then
1214  * signals to us that we have to close
1215  * the socket, but NOT complain about it.
1216  */
1217  if (errno == ECONNREFUSED) goto dead;
1218 
1219  PERROR("Failed writing to socket %s", s->listen->name);
1220  if (li->app_io->error) li->app_io->error(li);
1221 
1222  dead:
1223  fr_message_done(&cd->m);
1224  fr_network_socket_dead(nr, s);
1225  return;
1226  }
1227 
1228  /*
1229  * If we've done a partial write, localize the message and continue.
1230  */
1231  if ((size_t) rcode < cd->m.data_size) {
1232  s->written = rcode;
1233  goto save_pending;
1234  }
1235 
1236  s->written = 0;
1237 
1238  /*
1239  * Reset for the next message.
1240  */
1241  fr_message_done(&cd->m);
1242  nr->stats.out++;
1243  s->stats.out++;
1244 
1245  /*
1246  * Grab the net entry.
1247  */
1248  cd = fr_heap_pop(&s->waiting);
1249  }
1250 
1251  /*
1252  * We've successfully written all of the packets. Remove
1253  * the write callback.
1254  */
1256  PERROR("Failed removing write callback from event loop");
1257  fr_network_socket_dead(nr, s);
1258  }
1259 
1260  s->blocked = false;
1261 }
1262 
1264 {
1265  fr_network_t *nr = s->nr;
1266  fr_channel_data_t *cd;
1267 
1268  fr_assert(s->outstanding == 0);
1269 
1270  fr_rb_delete(nr->sockets, s);
1271  fr_rb_delete(nr->sockets_by_num, s);
1272 
1273  fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
1274 
1275  if (s->listen->app_io->close) {
1276  s->listen->app_io->close(s->listen);
1277  } else {
1278  close(s->listen->fd);
1279  }
1280 
1281  if (s->pending) {
1282  fr_message_done(&s->pending->m);
1283  s->pending = NULL;
1284  }
1285 
1286  /*
1287  * Clean up any queued entries.
1288  */
1289  while ((cd = fr_heap_pop(&s->waiting)) != NULL) {
1290  fr_message_done(&cd->m);
1291  }
1292 
1293  talloc_free(s->waiting);
1294  talloc_free(s->listen);
1295 
1296  return 0;
1297 }
1298 
1299 
1300 /** Handle a network control message callback for a new listener
1301  *
1302  * @param[in] ctx the network
1303  * @param[in] data the message
1304  * @param[in] data_size size of the data
1305  * @param[in] now the current time
1306  */
1307 static void fr_network_listen_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1308 {
1309  fr_network_t *nr = talloc_get_type_abort(ctx, fr_network_t);
1310  fr_listen_t *li;
1311 
1312  fr_assert(data_size == sizeof(li));
1313 
1314  if (data_size != sizeof(li)) return;
1315 
1316  li = talloc_get_type_abort(*((void * const *)data), fr_listen_t);
1317 
1318  (void) fr_network_listen_add_self(nr, li);
1319 }
1320 
1322 {
1324  fr_app_io_t const *app_io;
1325  size_t size;
1326  int num_messages;
1327 
1328  fr_assert(li->app_io != NULL);
1329 
1330  /*
1331  * Non-socket listeners just get told about the event
1332  * list, and nothing else.
1333  */
1334  if (li->non_socket_listener) {
1335  fr_assert(li->app_io->event_list_set != NULL);
1336  fr_assert(!li->app_io->read);
1337  fr_assert(!li->app_io->write);
1338 
1339  li->app_io->event_list_set(li, nr->el, nr);
1340 
1341  /*
1342  * We use fr_log() here to avoid the "Network - " prefix.
1343  */
1344  fr_log(nr->log, L_DBG, __FILE__, __LINE__, "Listener %s bound to virtual server %s",
1345  li->name, cf_section_name2(li->server_cs));
1346 
1347  return 0;
1348  }
1349 
1350  s = talloc_zero(nr, fr_network_socket_t);
1351  fr_assert(s != NULL);
1352  talloc_steal(s, li);
1353 
1354  s->nr = nr;
1355  s->listen = li;
1356  s->number = nr->num_sockets++;
1357 
1358  MEM(s->waiting = fr_heap_alloc(s, waiting_cmp, fr_channel_data_t, channel.heap_id, 0));
1359 
1360  talloc_set_destructor(s, _network_socket_free);
1361 
1362  /*
1363  * Put reasonable limits on the ring buffer size. Then
1364  * round it up to the nearest power of 2, which is
1365  * required by the ring buffer code.
1366  */
1367  num_messages = s->listen->num_messages;
1368  if (num_messages < 8) num_messages = 8;
1369 
1370  size = s->listen->default_message_size * num_messages;
1371  if (size < (1 << 17)) size = (1 << 17);
1372  if (size > (100 * 1024 * 1024)) size = (100 * 1024 * 1024);
1373 
1374  /*
1375  * Allocate the ring buffer for messages and packets.
1376  */
1377  s->ms = fr_message_set_create(s, num_messages,
1378  sizeof(fr_channel_data_t),
1379  size);
1380  if (!s->ms) {
1381  PERROR("Failed creating message buffers for network IO");
1382  talloc_free(s);
1383  return -1;
1384  }
1385 
1386  app_io = s->listen->app_io;
1388 
1389  if (fr_event_fd_insert(nr, NULL, nr->el, s->listen->fd,
1393  s) < 0) {
1394  PERROR("Failed adding new socket to network event loop");
1395  talloc_free(s);
1396  return -1;
1397  }
1398 
1399  /*
1400  * Start of with write updates being paused. We don't
1401  * care about being able to write if there's nothing to
1402  * write.
1403  */
1405 
1406  /*
1407  * Add the listener before calling the app_io, so that
1408  * the app_io can find the listener which we're adding
1409  * here.
1410  */
1411  (void) fr_rb_insert(nr->sockets, s);
1412  (void) fr_rb_insert(nr->sockets_by_num, s);
1413 
1414  if (app_io->event_list_set) app_io->event_list_set(s->listen, nr->el, nr);
1415 
1416  /*
1417  * We use fr_log() here to avoid the "Network - " prefix.
1418  */
1419  fr_log(nr->log, L_DBG, __FILE__, __LINE__, "Listening on %s bound to virtual server %s",
1421 
1422  DEBUG3("Using new socket %s with FD %d", s->listen->name, s->listen->fd);
1423 
1424  return 0;
1425 }
1426 
1427 /** Handle a network control message callback for a new "watch directory"
1428  *
1429  * @param[in] ctx the network
1430  * @param[in] data the message
1431  * @param[in] data_size size of the data
1432  * @param[in] now the current time
1433  */
1434 static void fr_network_directory_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1435 {
1436  int num_messages;
1437  fr_network_t *nr = talloc_get_type_abort(ctx, fr_network_t);
1438  fr_listen_t *li = talloc_get_type_abort(*((void * const *)data), fr_listen_t);
1440  fr_app_io_t const *app_io;
1442 
1443  if (fr_cond_assert(data_size == sizeof(li))) return;
1444 
1445  memcpy(&li, data, sizeof(li));
1446 
1447  s = talloc_zero(nr, fr_network_socket_t);
1448  fr_assert(s != NULL);
1449  talloc_steal(s, li);
1450 
1451  s->nr = nr;
1452  s->listen = li;
1453  s->number = nr->num_sockets++;
1454 
1455  MEM(s->waiting = fr_heap_alloc(s, waiting_cmp, fr_channel_data_t, channel.heap_id, 0));
1456 
1457  talloc_set_destructor(s, _network_socket_free);
1458 
1459  /*
1460  * Allocate the ring buffer for messages and packets.
1461  */
1462  num_messages = s->listen->num_messages;
1463  if (num_messages < 8) num_messages = 8;
1464 
1465  s->ms = fr_message_set_create(s, num_messages,
1466  sizeof(fr_channel_data_t),
1468  if (!s->ms) {
1469  PERROR("Failed creating message buffers for directory IO");
1470  talloc_free(s);
1471  return;
1472  }
1473 
1474  app_io = s->listen->app_io;
1475 
1476  if (app_io->event_list_set) app_io->event_list_set(s->listen, nr->el, nr);
1477 
1479 
1480  if (fr_event_filter_insert(nr, NULL, nr->el, s->listen->fd, s->filter,
1481  &funcs,
1482  app_io->error ? fr_network_error : NULL,
1483  s) < 0) {
1484  PERROR("Failed adding directory monitor event loop");
1485  talloc_free(s);
1486  return;
1487  }
1488 
1489  (void) fr_rb_insert(nr->sockets, s);
1490  (void) fr_rb_insert(nr->sockets_by_num, s);
1491 
1492  DEBUG3("Using new socket with FD %d", s->listen->fd);
1493 }
1494 
1495 /** Handle a network control message callback for a new worker
1496  *
1497  * @param[in] ctx the network
1498  * @param[in] data the message
1499  * @param[in] data_size size of the data
1500  * @param[in] now the current time
1501  */
1502 static void fr_network_worker_started_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1503 {
1504  int i;
1505  fr_network_t *nr = ctx;
1506  fr_worker_t *worker;
1508 
1509  fr_assert(data_size == sizeof(worker));
1510 
1511  memcpy(&worker, data, data_size);
1512  (void) talloc_get_type_abort(worker, fr_worker_t);
1513 
1514  MEM(w = talloc_zero(nr, fr_network_worker_t));
1515 
1516  w->worker = worker;
1517  w->channel = fr_worker_channel_create(worker, w, nr->control);
1519  fr_fatal_assert_msg(w->channel, "Failed creating new channel");
1520 
1523 
1524  /*
1525  * FIXME: This creates a race in the network loop
1526  * exit condition, because it can theoretically
1527  * be signalled to exit before the workers have
1528  * ACKd channel creation.
1529  */
1530  nr->num_workers++;
1531 
1532  /*
1533  * Insert the worker into the array of workers.
1534  */
1535  for (i = 0; i < nr->max_workers; i++) {
1536  if (nr->workers[i]) continue;
1537 
1538  nr->workers[i] = w;
1539  return;
1540  }
1541 
1542  /*
1543  * Run out of room to put workers!
1544  */
1545  fr_assert(0 == 1);
1546 }
1547 
1548 /** Handle a network control message callback for a packet sent to a socket
1549  *
1550  * @param[in] ctx the network
1551  * @param[in] data the message
1552  * @param[in] data_size size of the data
1553  * @param[in] now the current time
1554  */
1555 static void fr_network_inject_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
1556 {
1557  fr_network_t *nr = ctx;
1558  fr_network_inject_t my_inject;
1560 
1561  fr_assert(data_size == sizeof(my_inject));
1562 
1563  memcpy(&my_inject, data, data_size);
1564  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = my_inject.listen });
1565  if (!s) {
1566  talloc_free(my_inject.packet); /* MUST be it's own TALLOC_CTX */
1567  return;
1568  }
1569 
1570  /*
1571  * Inject the packet, and then read it back from the
1572  * network.
1573  */
1574  if (s->listen->app_io->inject(s->listen, my_inject.packet, my_inject.packet_len, my_inject.recv_time) == 0) {
1575  fr_network_read(nr->el, s->listen->fd, 0, s);
1576  }
1577 
1578  talloc_free(my_inject.packet);
1579 }
1580 
1581 /** Run the event loop 'pre' callback
1582  *
1583  * This function MUST DO NO WORK. All it does is check if there's
1584  * work, and tell the event code to return to the main loop if
1585  * there's work to do.
1586  *
1587  * @param[in] now the current time.
1588  * @param[in] wake the time when the event loop will wake up.
1589  * @param[in] uctx the network
1590  */
1592 {
1593  fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1594 
1595  if (fr_heap_num_elements(nr->replies) > 0) return 1;
1596 
1597  return 0;
1598 }
1599 
1600 /** Handle replies after all FD and timer events have been serviced
1601  *
1602  * @param el the event loop
1603  * @param now the current time (mostly)
1604  * @param uctx the fr_network_t
1605  */
1607 {
1608  fr_channel_data_t *cd;
1609  fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1610 
1611  /*
1612  * Pull the replies off of our global heap, and try to
1613  * push them to the individual sockets.
1614  */
1615  while ((cd = fr_heap_pop(&nr->replies)) != NULL) {
1616  fr_listen_t *li;
1618 
1619  li = cd->listen;
1620 
1621  /*
1622  * @todo - cache this somewhere so we don't need
1623  * to do an rbtree lookup for every packet.
1624  */
1625  s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
1626 
1627  /*
1628  * This shouldn't happen, but be safe...
1629  */
1630  if (!s) {
1631  fr_message_done(&cd->m);
1632  continue;
1633  }
1634 
1635  if (cd->m.status != FR_MESSAGE_LOCALIZED) {
1636  fr_assert(s->outstanding > 0);
1637  s->outstanding--;
1638  }
1639 
1640  /*
1641  * Just mark the message done, and skip it.
1642  */
1643  if (s->dead) {
1644  fr_message_done(&cd->m);
1645 
1646  /*
1647  * No more packets, it's safe to delete
1648  * the socket.
1649  */
1650  if (!s->outstanding) talloc_free(s);
1651 
1652  continue;
1653  }
1654 
1655  /*
1656  * No data to write to the socket, so we skip the message.
1657  */
1658  if (!cd->m.data_size) {
1659  fr_message_done(&cd->m);
1660  continue;
1661  }
1662 
1663  /*
1664  * No pending message, let's try writing it.
1665  *
1666  * If there is a pending message, then we're
1667  * waiting for IO write to become ready.
1668  */
1669  if (!s->pending) {
1670  fr_assert(!s->blocked);
1671  (void) fr_heap_insert(&s->waiting, cd);
1672  fr_network_write(nr->el, s->listen->fd, 0, s);
1673  }
1674  }
1675 }
1676 
1677 /** Stop a network thread in an orderly way
1678  *
1679  * @param[in] nr the network to stop
1680  */
1682 {
1683  fr_channel_data_t *cd;
1684 
1685  (void) talloc_get_type_abort(nr, fr_network_t);
1686 
1687  /*
1688  * Close the network sockets
1689  */
1690  {
1691  fr_network_socket_t **sockets;
1692  size_t len;
1693  size_t i;
1694 
1695  if (fr_rb_flatten_inorder(nr, (void ***)&sockets, nr->sockets) < 0) return -1;
1696  len = talloc_array_length(sockets);
1697 
1698  for (i = 0; i < len; i++) {
1699  /*
1700  * Force to zero so we don't trigger asserts
1701  * if packets are being processed and the
1702  * server exits.
1703  */
1704  sockets[i]->outstanding = 0;
1705  talloc_free(sockets[i]);
1706  }
1707 
1708  talloc_free(sockets);
1709  }
1710 
1711 
1712  /*
1713  * Clean up all outstanding replies.
1714  *
1715  * We can't do this after signalling the
1716  * workers to close, because they free
1717  * their message sets, and we end up
1718  * getting random use-after-free errors
1719  * as there's a race between the network
1720  * popping replies, and the workers
1721  * freeing their message sets.
1722  *
1723  * This isn't perfect, and we might still
1724  * lose some replies, but it's good enough
1725  * for now.
1726  *
1727  * @todo - call transport "done" for the reply, so that
1728  * it knows the replies are done, too.
1729  */
1730  while ((cd = fr_heap_pop(&nr->replies)) != NULL) {
1731  fr_message_done(&cd->m);
1732  }
1733 
1734  /*
1735  * Signal the workers that we're closing
1736  *
1737  * nr->num_workers is decremented every
1738  * time a worker closes a socket.
1739  *
1740  * When nr->num_workers == 0, the event
1741  * loop (fr_network()) will exit.
1742  */
1743  {
1744  int i;
1745 
1746  for (i = 0; i < nr->num_workers; i++) {
1747  fr_network_worker_t *worker = nr->workers[i];
1748 
1750  }
1751  }
1752 
1753  (void) fr_event_pre_delete(nr->el, fr_network_pre_event, nr);
1755  nr->exiting = true;
1757 
1758  return 0;
1759 }
1760 
1761 /** Read handler for signal pipe
1762  *
1763  */
1764 static void _signal_pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
1765 {
1766  fr_network_t *nr = talloc_get_type_abort(uctx, fr_network_t);
1767  uint8_t buff;
1768 
1769  if (read(fd, &buff, sizeof(buff)) < 0) {
1770  ERROR("Failed reading signal - %s", fr_syserror(errno));
1771  return;
1772  }
1773 
1774  fr_assert(buff == 1);
1775 
1776  /*
1777  * fr_network_stop() will signal the workers
1778  * to exit (by closing their channels).
1779  *
1780  * When we get the ack, we decrement our
1781  * nr->num_workers counter.
1782  *
1783  * When the counter reaches 0, the event loop
1784  * exits.
1785  */
1786  DEBUG2("Signalled to exit");
1787  fr_network_destroy(nr);
1788 }
1789 
1790 /** The main network worker function.
1791  *
1792  * @param[in] nr the network data structure to run.
1793  */
1795 {
1796  /*
1797  * Run until we're told to exit AND the number of
1798  * workers has dropped to zero.
1799  *
1800  * This is important as if we exit too early we
1801  * free the channels out from underneath the
1802  * workers and they read uninitialised memory.
1803  *
1804  * Whenever a worker ACKs our close notification
1805  * nr->num_workers is decremented, so when
1806  * nr->num_workers == 0, all workers have ACKd
1807  * our close and are no longer using the channel.
1808  */
1809  while (likely(!(nr->exiting && (nr->num_workers == 0)))) {
1810  bool wait_for_event;
1811  int num_events;
1812 
1813  /*
1814  * There are runnable requests. We still service
1815  * the event loop, but we don't wait for events.
1816  */
1817  wait_for_event = (fr_heap_num_elements(nr->replies) == 0);
1818 
1819  /*
1820  * Check the event list. If there's an error
1821  * (e.g. exit), we stop looping and clean up.
1822  */
1823  DEBUG4("Gathering events - %s", wait_for_event ? "will wait" : "Will not wait");
1824  num_events = fr_event_corral(nr->el, fr_time(), wait_for_event);
1825  DEBUG4("%u event(s) pending%s",
1826  num_events == -1 ? 0 : num_events, num_events == -1 ? " - event loop exiting" : "");
1827  if (num_events < 0) break;
1828 
1829  /*
1830  * Service outstanding events.
1831  */
1832  if (num_events > 0) {
1833  DEBUG4("Servicing event(s)");
1834  fr_event_service(nr->el);
1835  }
1836  }
1837  return;
1838 }
1839 
1840 /** Signal a network thread to exit
1841  *
1842  * @note Request to exit will be processed asynchronously.
1843  *
1844  * @param[in] nr the network data structure to manage
1845  * @return
1846  * - 0 on success.
1847  * - -1 on failure.
1848  */
1850 {
1851  if (write(nr->signal_pipe[1], &(uint8_t){ 0x01 }, 1) < 0) {
1852  fr_strerror_printf("Failed signalling network thread to exit - %s", fr_syserror(errno));
1853  return -1;
1854  }
1855 
1856  return 0;
1857 }
1858 
1859 /** Free any resources associated with a network thread
1860  *
1861  */
1863 {
1864  if (nr->signal_pipe[0] >= 0) close(nr->signal_pipe[0]);
1865  if (nr->signal_pipe[1] >= 0) close(nr->signal_pipe[1]);
1866 
1867  return 0;
1868 }
1869 
1870 /** Create a network
1871  *
1872  * @param[in] ctx The talloc ctx
1873  * @param[in] el The event list
1874  * @param[in] name Networker identifier.
1875  * @param[in] logger The destination for all logging messages
1876  * @param[in] lvl Log level
1877  * @param[in] config configuration structure.
1878  * @return
1879  * - NULL on error
1880  * - fr_network_t on success
1881  */
1882 fr_network_t *fr_network_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name,
1883  fr_log_t const *logger, fr_log_lvl_t lvl,
1884  fr_network_config_t const *config)
1885 {
1886  fr_network_t *nr;
1887 
1888  nr = talloc_zero(ctx, fr_network_t);
1889  if (!nr) {
1890  fr_strerror_const("Failed allocating memory");
1891  return NULL;
1892  }
1893  talloc_set_destructor(nr, _fr_network_free);
1894 
1895  nr->name = talloc_strdup(nr, name);
1896 
1897  nr->thread_id = pthread_self();
1898  nr->el = el;
1899  nr->log = logger;
1900  nr->lvl = lvl;
1901 
1902  nr->max_workers = MAX_WORKERS;
1903  nr->num_workers = 0;
1904  nr->signal_pipe[0] = -1;
1905  nr->signal_pipe[1] = -1;
1906  if (config) nr->config = *config;
1907 
1908  nr->aq_control = fr_atomic_queue_alloc(nr, 1024);
1909  if (!nr->aq_control) {
1910  talloc_free(nr);
1911  return NULL;
1912  }
1913 
1914  nr->control = fr_control_create(nr, el, nr->aq_control);
1915  if (!nr->control) {
1916  fr_strerror_const_push("Failed creating control queue");
1917  fail:
1918  talloc_free(nr);
1919  return NULL;
1920  }
1921 
1922  /*
1923  * @todo - rely on thread-local variables. And then the
1924  * various users of this can check if (rb == nr->rb), and
1925  * if so, skip the whole control plane / kevent /
1926  * whatever roundabout thing.
1927  */
1929  if (!nr->rb) {
1930  fr_strerror_const_push("Failed creating ring buffer");
1931  fail2:
1932  talloc_free(nr->control);
1933  goto fail;
1934  }
1935 
1937  fr_strerror_const_push("Failed adding channel callback");
1938  goto fail2;
1939  }
1940 
1942  fr_strerror_const_push("Failed adding socket callback");
1943  goto fail2;
1944  }
1945 
1947  fr_strerror_const_push("Failed adding socket callback");
1948  goto fail2;
1949  }
1950 
1952  fr_strerror_const_push("Failed adding worker callback");
1953  goto fail2;
1954  }
1955 
1957  fr_strerror_const_push("Failed adding packet injection callback");
1958  goto fail2;
1959  }
1960 
1961  /*
1962  * Create the various heaps.
1963  */
1965  if (!nr->sockets) {
1966  fr_strerror_const_push("Failed creating listen tree for sockets");
1967  goto fail2;
1968  }
1969 
1971  if (!nr->sockets_by_num) {
1972  fr_strerror_const_push("Failed creating number tree for sockets");
1973  goto fail2;
1974  }
1975 
1976  nr->replies = fr_heap_alloc(nr, reply_cmp, fr_channel_data_t, channel.heap_id, 0);
1977  if (!nr->replies) {
1978  fr_strerror_const_push("Failed creating heap for replies");
1979  goto fail2;
1980  }
1981 
1982  if (fr_event_pre_insert(nr->el, fr_network_pre_event, nr) < 0) {
1983  fr_strerror_const("Failed adding pre-check to event list");
1984  goto fail2;
1985  }
1986 
1987  if (fr_event_post_insert(nr->el, fr_network_post_event, nr) < 0) {
1988  fr_strerror_const("Failed inserting post-processing event");
1989  goto fail2;
1990  }
1991 
1992  if (pipe(nr->signal_pipe) < 0) {
1993  fr_strerror_printf("Failed initialising signal pipe - %s", fr_syserror(errno));
1994  goto fail2;
1995  }
1996  if (fr_nonblock(nr->signal_pipe[0]) < 0) goto fail2;
1997  if (fr_nonblock(nr->signal_pipe[1]) < 0) goto fail2;
1998 
1999  if (fr_event_fd_insert(nr, NULL, nr->el, nr->signal_pipe[0], _signal_pipe_read, NULL, NULL, nr) < 0) {
2000  fr_strerror_const("Failed inserting event for signal pipe");
2001  goto fail2;
2002  }
2003 
2004  return nr;
2005 }
2006 
2007 int fr_network_stats(fr_network_t const *nr, int num, uint64_t *stats)
2008 {
2009  if (num < 0) return -1;
2010  if (num == 0) return 0;
2011 
2012  stats[0] = nr->stats.in;
2013  if (num >= 2) stats[1] = nr->stats.out;
2014  if (num >= 3) stats[2] = nr->stats.dup;
2015  if (num >= 4) stats[3] = nr->stats.dropped;
2016  if (num >= 5) stats[4] = nr->num_workers;
2017 
2018  if (num <= 5) return num;
2019 
2020  return 5;
2021 }
2022 
2023 void fr_network_stats_log(fr_network_t const *nr, fr_log_t const *log)
2024 {
2025  int i;
2026 
2027  /*
2028  * Dump all of the channel statistics.
2029  */
2030  for (i = 0; i < nr->max_workers; i++) {
2031  if (!nr->workers[i]) continue;
2032 
2033  fr_channel_stats_log(nr->workers[i]->channel, log, __FILE__, __LINE__);
2034  }
2035 }
2036 
2037 static int cmd_stats_self(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
2038 {
2039  fr_network_t const *nr = ctx;
2040 
2041  fprintf(fp, "count.in\t%" PRIu64 "\n", nr->stats.in);
2042  fprintf(fp, "count.out\t%" PRIu64 "\n", nr->stats.out);
2043  fprintf(fp, "count.dup\t%" PRIu64 "\n", nr->stats.dup);
2044  fprintf(fp, "count.dropped\t%" PRIu64 "\n", nr->stats.dropped);
2045  fprintf(fp, "count.sockets\t%u\n", fr_rb_num_elements(nr->sockets));
2046 
2047  return 0;
2048 }
2049 
2050 static int cmd_socket_list(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
2051 {
2052  fr_network_t const *nr = ctx;
2055 
2056  // @todo - note that this isn't thread-safe!
2057 
2058  for (s = fr_rb_iter_init_inorder(&iter, nr->sockets);
2059  s != NULL;
2060  s = fr_rb_iter_next_inorder(&iter)) {
2061  if (!s->listen->app_io->get_name) {
2062  fprintf(fp, "%s\n", s->listen->app_io->common.name);
2063  } else {
2064  fprintf(fp, "%d\t%s\n", s->number, s->listen->app_io->get_name(s->listen));
2065  }
2066  }
2067  return 0;
2068 }
2069 
2070 static int cmd_stats_socket(FILE *fp, FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
2071 {
2072  fr_network_t const *nr = ctx;
2074 
2075  s = fr_rb_find(nr->sockets_by_num, &(fr_network_socket_t){ .number = info->box[0]->vb_uint32 });
2076  if (!s) {
2077  fprintf(fp_err, "No such socket number '%s'.\n", info->argv[0]);
2078  return -1;
2079  }
2080 
2081  fprintf(fp, "count.in\t%" PRIu64 "\n", s->stats.in);
2082  fprintf(fp, "count.out\t%" PRIu64 "\n", s->stats.out);
2083  fprintf(fp, "count.dup\t%" PRIu64 "\n", s->stats.dup);
2084  fprintf(fp, "count.dropped\t%" PRIu64 "\n", s->stats.dropped);
2085 
2086  return 0;
2087 }
2088 
2089 
2091  {
2092  .parent = "stats",
2093  .name = "network",
2094  .help = "Statistics for network threads.",
2095  .read_only = true
2096  },
2097 
2098  {
2099  .parent = "stats network",
2100  .add_name = true,
2101  .name = "self",
2102  .func = cmd_stats_self,
2103  .help = "Show statistics for a specific network thread.",
2104  .read_only = true
2105  },
2106 
2107  {
2108  .parent = "stats network",
2109  .add_name = true,
2110  .name = "socket",
2111  .syntax = "INTEGER",
2112  .func = cmd_stats_socket,
2113  .help = "Show statistics for a specific socket",
2114  .read_only = true
2115  },
2116 
2117  {
2118  .parent = "show",
2119  .name = "network",
2120  .help = "Show information about network threads.",
2121  .read_only = true
2122  },
2123 
2124  {
2125  .parent = "show network",
2126  .add_name = true,
2127  .name = "socket",
2128  .syntax = "list",
2129  .func = cmd_socket_list,
2130  .help = "List the sockets associated with this network thread.",
2131  .read_only = true
2132  },
2133 
2135 };
static int const char char buffer[256]
Definition: acutest.h:574
fr_io_close_t close
Close the transport.
Definition: app_io.h:60
fr_io_data_read_t read
Read from a socket to a data buffer.
Definition: app_io.h:47
module_t common
Common fields to all loadable modules.
Definition: app_io.h:34
fr_io_signal_t error
There was an error on the socket.
Definition: app_io.h:59
fr_app_event_list_set_t event_list_set
Called by the network thread to pass an event list for use by the app_io_t.
Definition: app_io.h:36
fr_io_data_inject_t inject
Inject a packet into a socket.
Definition: app_io.h:50
fr_io_data_vnode_t vnode
Handle notifications that the VNODE has changed.
Definition: app_io.h:52
fr_io_data_write_t write
Write from a data buffer to a socket.
Definition: app_io.h:48
fr_io_name_t get_name
get the socket name
Definition: app_io.h:70
Public structure describing an I/O path for a protocol.
Definition: app_io.h:33
fr_app_priority_get_t priority
Assign a priority to the packet.
Definition: application.h:90
#define fr_atexit_thread_local(_name, _free, _uctx)
Definition: atexit.h:221
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Definition: atomic_queue.c:80
Structure to hold the atomic queue.
Definition: atomic_queue.c:54
#define RCSID(id)
Definition: build.h:481
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition: build.h:110
#define UNUSED
Definition: build.h:313
char const * cf_section_name2(CONF_SECTION const *cs)
Return the second identifier of a CONF_SECTION.
Definition: cf_util.c:1185
void * fr_channel_requestor_uctx_get(fr_channel_t *ch)
Get network-specific data from a channel.
Definition: channel.c:922
fr_table_num_sorted_t const channel_signals[]
Definition: channel.c:153
bool fr_channel_recv_reply(fr_channel_t *ch)
Receive a reply message from the channel.
Definition: channel.c:408
int fr_channel_signal_responder_close(fr_channel_t *ch)
Signal a responder that the channel is closing.
Definition: channel.c:824
int fr_channel_send_request(fr_channel_t *ch, fr_channel_data_t *cd)
Send a request message into the channel.
Definition: channel.c:306
int fr_channel_set_recv_reply(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_reply)
Definition: channel.c:930
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition: channel.c:685
void fr_channel_requestor_uctx_add(fr_channel_t *ch, void *uctx)
Add network-specific data to a channel.
Definition: channel.c:910
void fr_channel_stats_log(fr_channel_t const *ch, fr_log_t const *log, char const *file, int line)
Definition: channel.c:963
A full channel, which consists of two ends.
Definition: channel.c:144
fr_message_t m
the message header
Definition: channel.h:105
fr_channel_event_t
Definition: channel.h:67
@ FR_CHANNEL_NOOP
Definition: channel.h:74
@ FR_CHANNEL_EMPTY
Definition: channel.h:75
@ FR_CHANNEL_CLOSE
Definition: channel.h:72
@ FR_CHANNEL_ERROR
Definition: channel.h:68
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition: channel.h:70
@ FR_CHANNEL_OPEN
Definition: channel.h:71
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition: channel.h:69
void * packet_ctx
Packet specific context for holding client information, and other proto_* specific information that n...
Definition: channel.h:142
fr_listen_t * listen
for tracking packet transport, etc.
Definition: channel.h:146
#define PRIORITY_NORMAL
Definition: channel.h:151
#define PRIORITY_NOW
Definition: channel.h:149
uint32_t priority
Priority of this packet.
Definition: channel.h:140
Channel information which is added to a message.
Definition: channel.h:104
char const * parent
e.g. "show module"
Definition: command.h:52
#define CMD_TABLE_END
Definition: command.h:62
char const ** argv
text version of commands
Definition: command.h:42
#define FR_CONTROL_ID_INJECT
Definition: control.h:60
#define FR_CONTROL_ID_DIRECTORY
Definition: control.h:59
#define FR_CONTROL_ID_CHANNEL
Definition: control.h:56
#define FR_CONTROL_ID_LISTEN
Definition: control.h:57
#define FR_CONTROL_ID_WORKER
Definition: control.h:58
#define FR_CONTROL_MAX_SIZE
Definition: control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition: control.h:50
static fr_ring_buffer_t * rb
Definition: control_test.c:51
next
Definition: dcursor.h:178
fr_dcursor_eval_t void const * uctx
Definition: dcursor.h:546
fr_dcursor_iter_t iter
Definition: dcursor.h:147
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:139
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:156
#define fr_fatal_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition: debug.h:184
#define ERROR(fmt,...)
Definition: dhcpclient.c:41
static int sockfd
Definition: dhcpclient.c:56
#define fr_event_fd_insert(...)
Definition: event.h:232
fr_event_filter_t
The type of filter to install for an FD.
Definition: event.h:61
@ FR_EVENT_FILTER_VNODE
Filter for vnode subfilters.
Definition: event.h:63
@ FR_EVENT_FILTER_IO
Combined filter for read/write functions/.
Definition: event.h:62
#define fr_event_filter_update(...)
Definition: event.h:224
#define fr_event_filter_insert(...)
Definition: event.h:219
#define FR_EVENT_RESUME(_s, _f)
Re-add the filter for a func from kevent.
Definition: event.h:110
#define FR_EVENT_SUSPEND(_s, _f)
Temporarily remove the filter for a func from kevent.
Definition: event.h:94
fr_event_fd_cb_t extend
Additional files were added to a directory.
Definition: event.h:183
Callbacks for the FR_EVENT_FILTER_IO filter.
Definition: event.h:173
Structure describing a modification to a filter's state.
Definition: event.h:75
Callbacks for the FR_EVENT_FILTER_VNODE filter.
Definition: event.h:180
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
Definition: heap.c:322
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition: heap.c:146
unsigned int fr_heap_index_t
Definition: heap.h:80
#define fr_heap_alloc(_ctx, _cmp, _type, _field, _init)
Creates a heap that can be used with non-talloced elements.
Definition: heap.h:100
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition: heap.h:179
#define FR_HEAP_INDEX_INVALID
Definition: heap.h:83
The main heap structure.
Definition: heap.h:66
uint64_t out
Definition: base.h:43
uint64_t dup
Definition: base.h:44
uint64_t dropped
Definition: base.h:45
uint64_t in
Definition: base.h:42
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition: control.c:149
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition: control.c:417
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition: control.c:343
The control structure.
Definition: control.c:79
size_t num_messages
for the message ring buffer
Definition: listen.h:52
bool non_socket_listener
special internal listener that does not use sockets.
Definition: listen.h:45
char const * name
printable name for this socket - set by open
Definition: listen.h:29
void const * app_instance
Definition: listen.h:38
size_t default_message_size
copied from app_io, but may be changed
Definition: listen.h:51
fr_app_t const * app
Definition: listen.h:37
CONF_SECTION * server_cs
CONF_SECTION of the server.
Definition: listen.h:40
bool no_write_callback
sometimes we don't need to do writes
Definition: listen.h:44
int fd
file descriptor for this socket - set by open
Definition: listen.h:28
bool needs_full_setup
Set to true to avoid the short cut when adding the listener.
Definition: listen.h:46
fr_app_io_t const * app_io
I/O path functions.
Definition: listen.h:31
fr_ring_buffer_t * rb
ring buffer for my control-plane messages
Definition: network.c:126
fr_cmd_table_t cmd_network_table[]
Definition: network.c:2090
size_t fr_network_listen_outstanding(fr_network_t *nr, fr_listen_t *li)
Get the number of outstanding packets.
Definition: network.c:805
size_t written
however much we did in a partial write
Definition: network.c:92
int fr_network_listen_send_packet(fr_network_t *nr, fr_listen_t *parent, fr_listen_t *li, const uint8_t *buffer, size_t buflen, fr_time_t recv_time, void *packet_ctx)
Send a packet to the worker.
Definition: network.c:763
fr_atomic_queue_t * aq_control
atomic queue for control messages sent to me
Definition: network.c:122
static int cmd_stats_socket(FILE *fp, FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
Definition: network.c:2070
int fr_network_listen_add(fr_network_t *nr, fr_listen_t *li)
Add a fr_listen_t to a network.
Definition: network.c:236
bool suspended
whether or not we're suspended.
Definition: network.c:117
int fr_network_worker_add(fr_network_t *nr, fr_worker_t *worker)
Add a worker to a network in a different thread.
Definition: network.c:293
int fr_network_destroy(fr_network_t *nr)
Stop a network thread in an orderly way.
Definition: network.c:1681
fr_network_t * nr
O(N) issues in talloc.
Definition: network.c:77
fr_io_stats_t stats
Definition: network.c:70
fr_listen_t * listen
Definition: network.c:52
static void fr_network_listen_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a new listener.
Definition: network.c:1307
uint8_t * packet
Definition: network.c:53
static int cmd_stats_self(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
Definition: network.c:2037
fr_log_t const * log
log destination
Definition: network.c:119
int fr_network_directory_add(fr_network_t *nr, fr_listen_t *li)
Add a "watch directory" call to a network.
Definition: network.c:278
static int _fr_network_free(fr_network_t *nr)
Free any resources associated with a network thread.
Definition: network.c:1862
static void fr_network_inject_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a packet sent to a socket.
Definition: network.c:1555
fr_heap_index_t heap_id
for the sockets_by_num heap
Definition: network.c:79
#define RTT(_old, _new)
Definition: network.c:478
fr_network_t * fr_network_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_network_config_t const *config)
Create a network.
Definition: network.c:1882
void fr_network(fr_network_t *nr)
The main network worker function.
Definition: network.c:1794
fr_message_set_t * ms
message buffers for this socket.
Definition: network.c:89
int fr_network_listen_delete(fr_network_t *nr, fr_listen_t *li)
Delete a socket from a network.
Definition: network.c:259
int num_blocked
number of blocked workers
Definition: network.c:138
static void fr_network_worker_started_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
char const * name
Network ID for logging.
Definition: network.c:113
void fr_network_worker_add_self(fr_network_t *nr, fr_worker_t *worker)
Add a worker to a network in the same thread.
Definition: network.c:313
unsigned int outstanding
number of outstanding packets sent to the worker
Definition: network.c:86
static _Thread_local fr_ring_buffer_t * fr_network_rb
Definition: network.c:49
int number
unique ID
Definition: network.c:78
static fr_event_update_t const resume_write[]
Definition: network.c:1125
fr_time_delta_t predicted
predicted processing time for one packet
Definition: network.c:64
static int fr_network_pre_event(fr_time_t now, fr_time_delta_t wake, void *uctx)
fr_worker_t * worker
worker pointer
Definition: network.c:69
int fr_network_sendto_worker(fr_network_t *nr, fr_listen_t *li, void *packet_ctx, uint8_t const *data, size_t data_len, fr_time_t recv_time)
Definition: network.c:1030
int fr_network_exit(fr_network_t *nr)
Signal a network thread to exit.
Definition: network.c:1849
#define MAX_WORKERS
Definition: network.c:47
int fr_network_listen_inject(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, fr_time_t recv_time)
Inject a packet for a listener to read.
Definition: network.c:398
fr_listen_t * listen
I/O ctx and functions.
Definition: network.c:87
int num_sockets
actually a counter...
Definition: network.c:141
fr_rb_node_t listen_node
rbtree node for looking up by listener.
Definition: network.c:74
static void fr_network_vnode_extend(UNUSED fr_event_list_t *el, int sockfd, int fflags, void *ctx)
Get a notification that a vnode changed.
Definition: network.c:1075
static void _signal_pipe_read(UNUSED fr_event_list_t *el, int fd, UNUSED int flags, void *uctx)
Read handler for signal pipe.
Definition: network.c:1764
#define OUTSTANDING(_x)
Definition: network.c:595
static int8_t reply_cmp(void const *one, void const *two)
Definition: network.c:157
int num_workers
number of active workers
Definition: network.c:137
static int8_t socket_num_cmp(void const *one, void const *two)
Definition: network.c:186
int num_pending_workers
number of workers we're waiting to start.
Definition: network.c:139
fr_rb_tree_t * sockets
list of sockets we're managing, ordered by the listener
Definition: network.c:134
pthread_t thread_id
for self
Definition: network.c:115
fr_log_lvl_t lvl
debug log level
Definition: network.c:120
int signal_pipe[2]
Pipe for signalling the worker in an orderly way.
Definition: network.c:143
fr_channel_data_t * pending
the currently pending partial packet
Definition: network.c:94
static int8_t socket_listen_cmp(void const *one, void const *two)
Definition: network.c:179
static void fr_network_write(UNUSED fr_event_list_t *el, UNUSED int sockfd, UNUSED int flags, void *ctx)
Write packets to the network.
Definition: network.c:1138
fr_event_list_t * el
our event list
Definition: network.c:128
fr_heap_t * replies
replies from the worker, ordered by priority / origin time
Definition: network.c:130
static int fr_network_send_request(fr_network_t *nr, fr_channel_data_t *cd)
Send a message on the "best" channel.
Definition: network.c:602
void fr_network_stats_log(fr_network_t const *nr, fr_log_t const *log)
Definition: network.c:2023
fr_heap_t * waiting
packets waiting to be written
Definition: network.c:95
int fr_network_stats(fr_network_t const *nr, int num, uint64_t *stats)
Definition: network.c:2007
fr_heap_index_t heap_id
workers are in a heap
Definition: network.c:62
bool blocked
is this worker blocked?
Definition: network.c:66
static void fr_network_read(UNUSED fr_event_list_t *el, int sockfd, UNUSED int flags, void *ctx)
Read a packet from the network.
Definition: network.c:862
static bool is_network_thread(fr_network_t const *nr)
Definition: network.c:224
fr_rb_node_t num_node
rbtree node for looking up by number.
Definition: network.c:75
static void fr_network_error(UNUSED fr_event_list_t *el, UNUSED int sockfd, int flags, int fd_errno, void *ctx)
Handle errors for a socket.
Definition: network.c:1100
fr_io_stats_t stats
Definition: network.c:96
fr_channel_data_t * cd
cached in case of allocation & read error
Definition: network.c:90
static int fr_network_listen_add_self(fr_network_t *nr, fr_listen_t *listen)
Definition: network.c:1321
static void fr_network_suspend(fr_network_t *nr)
Definition: network.c:439
bool dead
is it dead?
Definition: network.c:83
size_t leftover
leftover data from a previous read
Definition: network.c:91
static void fr_network_post_event(fr_event_list_t *el, fr_time_t now, void *uctx)
fr_network_worker_t * workers[MAX_WORKERS]
each worker
Definition: network.c:149
fr_time_t recv_time
Definition: network.c:55
static void fr_network_unsuspend(fr_network_t *nr)
Definition: network.c:458
fr_rb_tree_t * sockets_by_num
ordered by number;
Definition: network.c:135
fr_network_config_t config
configuration
Definition: network.c:148
void fr_network_listen_read(fr_network_t *nr, fr_listen_t *li)
Signal the network to read from a listener.
Definition: network.c:324
static int8_t waiting_cmp(void const *one, void const *two)
Definition: network.c:168
fr_io_stats_t stats
Definition: network.c:132
size_t packet_len
Definition: network.c:54
static fr_ring_buffer_t * fr_network_rb_init(void)
Initialise thread local storage.
Definition: network.c:206
static int _fr_network_rb_free(void *arg)
Definition: network.c:197
static void fr_network_recv_reply(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Callback which handles a message being received on the network side.
Definition: network.c:486
int max_workers
maximum number of allowed workers
Definition: network.c:140
void fr_network_listen_write(fr_network_t *nr, fr_listen_t *li, uint8_t const *packet, size_t packet_len, void *packet_ctx, fr_time_t request_time)
Inject a packet for a listener to write.
Definition: network.c:350
bool exiting
are we exiting?
Definition: network.c:146
fr_event_filter_t filter
what type of filter it is
Definition: network.c:81
static void fr_network_socket_dead(fr_network_t *nr, fr_network_socket_t *s)
Definition: network.c:821
static void fr_network_directory_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
Handle a network control message callback for a new "watch directory".
Definition: network.c:1434
fr_channel_t * channel
channel to the worker
Definition: network.c:68
static fr_event_update_t const pause_write[]
Definition: network.c:1120
static int _network_socket_free(fr_network_socket_t *s)
Definition: network.c:1263
static void fr_network_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
Handle a network control message callback for a channel.
Definition: network.c:534
static int cmd_socket_list(FILE *fp, UNUSED FILE *fp_err, void *ctx, UNUSED fr_cmd_info_t const *info)
Definition: network.c:2050
fr_time_delta_t cpu_time
how much CPU time this worker has spent
Definition: network.c:63
fr_control_t * control
the control plane
Definition: network.c:124
bool blocked
is it blocked?
Definition: network.c:84
Associate a worker thread with a network thread.
Definition: network.c:61
uint32_t max_outstanding
Definition: network.h:46
#define PERROR(_fmt,...)
Definition: log.h:228
#define DEBUG3(_fmt,...)
Definition: log.h:266
#define DEBUG4(_fmt,...)
Definition: log.h:267
#define RATE_LIMIT_GLOBAL(_log, _fmt,...)
Rate limit messages using a global limiting entry.
Definition: log.h:641
int fr_event_post_delete(fr_event_list_t *el, fr_event_timer_cb_t callback, void *uctx)
Delete a post-event callback from the event list.
Definition: event.c:2335
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition: event.c:2549
int fr_event_pre_delete(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Delete a pre-event callback from the event list.
Definition: event.c:2281
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition: event.c:2414
talloc_free(reap)
int fr_event_post_insert(fr_event_list_t *el, fr_event_timer_cb_t callback, void *uctx)
Add a post-event callback to the event list.
Definition: event.c:2313
int fr_event_pre_insert(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Add a pre-event callback to the event list.
Definition: event.c:2259
int fr_event_fd_delete(fr_event_list_t *el, int fd, fr_event_filter_t filter)
Remove a file descriptor from the event loop.
Definition: event.c:1260
Stores all information relating to an event list.
Definition: event.c:411
void fr_log(fr_log_t const *log, fr_log_type_t type, char const *file, int line, char const *fmt,...)
Send a server log message to its destination.
Definition: log.c:583
fr_log_lvl_t
Definition: log.h:67
@ L_DBG
Only displayed when debugging is enabled.
Definition: log.h:59
static fr_event_update_t pause_read[]
Definition: master.c:155
static fr_event_update_t resume_read[]
Definition: master.c:160
unsigned int uint32_t
Definition: merged_model.c:33
long int ssize_t
Definition: merged_model.c:24
unsigned char uint8_t
Definition: merged_model.c:30
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
Definition: message.c:127
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition: message.c:190
fr_message_t * fr_message_localize(TALLOC_CTX *ctx, fr_message_t *m, size_t message_size)
Localize a message by copying it to local storage.
Definition: message.c:242
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition: message.c:934
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition: message.c:988
fr_message_t * fr_message_alloc_reserve(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size, size_t leftover, size_t reserve_size)
Allocate packet data for a message, and reserve a new message.
Definition: message.c:1077
A Message set, composed of message headers and ring buffer data.
Definition: message.c:95
size_t rb_size
cache-aligned size in the ring buffer
Definition: message.h:51
fr_time_t when
when this message was sent
Definition: message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition: message.h:49
size_t data_size
size of the data in the ring buffer
Definition: message.h:50
@ FR_MESSAGE_USED
Definition: message.h:39
@ FR_MESSAGE_LOCALIZED
Definition: message.h:40
fr_message_status_t status
free, used, done, etc.
Definition: message.h:45
int fr_nonblock(UNUSED int fd)
Definition: misc.c:293
static const conf_parser_t config[]
Definition: base.c:183
static rc_stats_t stats
Definition: radclient-ng.c:74
#define DEBUG2(fmt,...)
Definition: radclient.h:43
static fr_app_io_t app_io
Definition: radius1_test.c:146
uint32_t fr_rand(void)
Return a 32-bit random number.
Definition: rand.c:106
void * fr_rb_iter_next_inorder(fr_rb_iter_inorder_t *iter)
Return the next node.
Definition: rb.c:850
void * fr_rb_iter_init_inorder(fr_rb_iter_inorder_t *iter, fr_rb_tree_t *tree)
Initialise an in-order iterator.
Definition: rb.c:824
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
Definition: rb.h:246
int fr_rb_flatten_inorder(TALLOC_CTX *ctx, void **out[], fr_rb_tree_t *tree)
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
bool fr_rb_delete(fr_rb_tree_t *tree, void const *data)
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Iterator structure for in-order traversal of an rbtree.
Definition: rb.h:321
The main red black tree structure.
Definition: rb.h:73
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition: ring_buffer.c:64
static char const * name
static char buff[sizeof("18446744073709551615")+3]
Definition: size_tests.c:41
fr_assert(0)
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
Definition: log.h:96
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition: syserror.c:243
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition: table.h:772
#define talloc_get_type_abort_const
Definition: talloc.h:282
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition: time.h:575
static fr_time_delta_t fr_time_delta_add(fr_time_delta_t a, fr_time_delta_t b)
Definition: time.h:255
#define fr_time_delta_lt(_a, _b)
Definition: time.h:285
#define fr_time_wrap(_time)
Definition: time.h:145
#define fr_time_delta_ispos(_a)
Definition: time.h:290
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition: time.h:916
A time delta, a difference in time measured in nanoseconds.
Definition: time.h:80
"server local" time.
Definition: time.h:69
close(uq->fd)
static fr_event_list_t * el
static fr_slen_t parent
Definition: pair.h:851
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
Definition: strerror.c:733
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition: strerror.h:64
#define fr_strerror_const_push(_msg)
Definition: strerror.h:227
#define fr_strerror_const(_msg)
Definition: strerror.h:223
static fr_slen_t data
Definition: value.h:1265
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
Definition: worker.c:1605
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
Definition: worker.c:1632
A worker which takes packets from a master, and processes them.
Definition: worker.c:94