The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
worker.c
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or
5  * (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17  /**
18  * $Id: 3b3d974aad70419f1679ce2598ae633ae7162179 $
19  *
20  * @brief Worker thread functions.
21  * @file io/worker.c
22  *
23  * The "worker" thread is the one responsible for the bulk of the
24  * work done when processing a request. Workers are spawned by the
25  * scheduler, and create a kqueue (KQ) and control-plane
26  * Atomic Queue (AQ) for control-plane communication.
27  *
28  * When a network thread discovers that it needs more workers, it
29  * asks the scheduler for a KQ/AQ combination. The network thread
30  * then creates a channel dedicated to that worker, and sends the
31  * channel to the worker in a "new channel" message. The worker
32  * receives the channel, and sends an ACK back to the network thread.
33  *
34  * The network thread then sends the worker new packets, which the
35  * worker receives and processes.
36  *
37  * When a packet is decoded, it is put into the "runnable" heap, and
38  * also into the "time_order" heap. The main loop fr_worker() then
39  * pulls new requests off of this heap and runs them. The
40  * worker_check_timeouts() function also checks the tail of the
41  * "time_order" heap, and ages out requests which have been active
42  * for "too long".
43  *
44  * If a request is yielded, it is placed onto the yielded list in
45  * the worker "tracking" data structure.
46  *
47  * @copyright 2016 Alan DeKok (aland@freeradius.org)
48  */
49 
50 RCSID("$Id: 3b3d974aad70419f1679ce2598ae633ae7162179 $")
51 
52 #define LOG_PREFIX worker->name
53 #define LOG_DST worker->log
54 
55 #include <freeradius-devel/io/channel.h>
56 #include <freeradius-devel/io/listen.h>
57 #include <freeradius-devel/io/message.h>
58 #include <freeradius-devel/io/worker.h>
59 #include <freeradius-devel/unlang/base.h>
60 #include <freeradius-devel/unlang/call.h>
61 #include <freeradius-devel/unlang/interpret.h>
62 #include <freeradius-devel/server/request.h>
63 #include <freeradius-devel/server/time_tracking.h>
64 #include <freeradius-devel/util/dlist.h>
65 #include <freeradius-devel/util/minmax_heap.h>
66 
67 #include <stdalign.h>
68 
69 #ifdef WITH_VERIFY_PTR
70 static void worker_verify(fr_worker_t *worker);
71 #define WORKER_VERIFY worker_verify(worker)
72 #else
73 #define WORKER_VERIFY
74 #endif
75 
76 #define CACHE_LINE_SIZE 64
78 
79 static _Thread_local fr_ring_buffer_t *fr_worker_rb;
80 
81 typedef struct {
83 
84  /*
85  * To save time, we don't care about num_elements here. Which means that we don't
86  * need to cache or lookup the fr_worker_listen_t when we free a request.
87  */
90 
91 /**
92  * A worker which takes packets from a master, and processes them.
93  */
94 struct fr_worker_s {
95  char const *name; //!< name of this worker
96  fr_worker_config_t config; //!< external configuration
97 
98  unlang_interpret_t *intp; //!< Worker's local interpreter.
99 
100  pthread_t thread_id; //!< my thread ID
101 
102  fr_log_t const *log; //!< log destination
103  fr_log_lvl_t lvl; //!< log level
104 
105  fr_atomic_queue_t *aq_control; //!< atomic queue for control messages sent to me
106 
107  fr_control_t *control; //!< the control plane
108 
109  fr_event_list_t *el; //!< our event list
110 
111  int num_channels; //!< actual number of channels
112 
113  fr_heap_t *runnable; //!< current runnable requests which we've spent time processing
114  fr_minmax_heap_t *time_order; //!< time ordered heap of requests
115  fr_rb_tree_t *dedup; //!< de-dup tree
116 
117  fr_rb_tree_t *listeners; //!< so we can cancel requests when a listener goes away
118 
119  fr_io_stats_t stats; //!< input / output stats
120  fr_time_elapsed_t cpu_time; //!< histogram of total CPU time per request
121  fr_time_elapsed_t wall_clock; //!< histogram of wall clock time per request
122 
123  uint64_t num_naks; //!< number of messages which were nak'd
124  uint64_t num_active; //!< number of active requests
125 
126  fr_time_delta_t predicted; //!< How long we predict a request will take to execute.
127  fr_time_tracking_t tracking; //!< how much time the worker has spent doing things.
128 
129  bool was_sleeping; //!< used to suppress multiple sleep signals in a row
130  bool exiting; //!< are we exiting?
131 
132  fr_time_t checked_timeout; //!< when we last checked the tails of the queues
133 
134  fr_event_timer_t const *ev_cleanup; //!< timer for max_request_time
135 
136  fr_worker_channel_t *channel; //!< list of channels
137 };
138 
139 typedef struct {
140  fr_listen_t const *listener; //!< incoming packets
141 
142  fr_rb_node_t node; //!< in tree of listeners
143 
144  /*
145  * To save time, we don't care about num_elements here. Which means that we don't
146  * need to cache or lookup the fr_worker_listen_t when we free a request.
147  */
148  fr_dlist_head_t dlist; //!< of requests associated with this listener.
150 
151 
152 static int8_t worker_listener_cmp(void const *one, void const *two)
153 {
154  fr_worker_listen_t const *a = one, *b = two;
155 
156  return CMP(a->listener, b->listener);
157 }
158 
159 
160 /*
161  * Explicitly cleanup the memory allocated to the ring buffer,
162  * just in case valgrind complains about it.
163  */
164 static int _fr_worker_rb_free(void *arg)
165 {
166  return talloc_free(arg);
167 }
168 
169 /** Initialise thread local storage
170  *
171  * @return fr_ring_buffer_t for messages
172  */
174 {
176 
177  rb = fr_worker_rb;
178  if (rb) return rb;
179 
181  if (!rb) {
182  fr_perror("Failed allocating memory for worker ring buffer");
183  return NULL;
184  }
185 
187 
188  return rb;
189 }
190 
191 static inline bool is_worker_thread(fr_worker_t const *worker)
192 {
193  return (pthread_equal(pthread_self(), worker->thread_id) != 0);
194 }
195 
196 static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now);
197 static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now);
198 static void worker_max_request_time(UNUSED fr_event_list_t *el, UNUSED fr_time_t when, void *uctx);
199 static void worker_max_request_timer(fr_worker_t *worker);
200 
201 /** Callback which handles a message being received on the worker side.
202  *
203  * @param[in] ctx the worker
204  * @param[in] ch the channel to drain
205  * @param[in] cd the message (if any) to start with
206  */
207 static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
208 {
209  fr_worker_t *worker = ctx;
210 
211  worker->stats.in++;
212  DEBUG3("Received request %" PRIu64 "", worker->stats.in);
213  cd->channel.ch = ch;
214  worker_request_bootstrap(worker, cd, fr_time());
215 }
216 
218 {
219  request_t *request;
220 
221  while ((request = fr_dlist_pop_head(&ch->dlist)) != NULL) {
222  unlang_interpret_signal(request, FR_SIGNAL_CANCEL);
223  }
224 }
225 
226 static void worker_exit(fr_worker_t *worker)
227 {
228  worker->exiting = true;
229 
230  /*
231  * Don't allow the post event to run
232  * any more requests. They'll be
233  * signalled to stop before we exit.
234  *
235  * This only has an effect in single
236  * threaded mode.
237  */
238  (void)fr_event_post_delete(worker->el, fr_worker_post_event, worker);
239 }
240 
241 /** Handle a control plane message sent to the worker via a channel
242  *
243  * @param[in] ctx the worker
244  * @param[in] data the message
245  * @param[in] data_size size of the data
246  * @param[in] now the current time
247  */
248 static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
249 {
250  int i;
251  bool ok, was_sleeping;
252  fr_channel_t *ch;
253  fr_message_set_t *ms;
255  fr_worker_t *worker = ctx;
256 
257  was_sleeping = worker->was_sleeping;
258  worker->was_sleeping = false;
259 
260  /*
261  * We were woken up by a signal to do something. We're
262  * not sleeping.
263  */
264  ce = fr_channel_service_message(now, &ch, data, data_size);
265  DEBUG3("Channel %s",
266  fr_table_str_by_value(channel_signals, ce, "<INVALID>"));
267  switch (ce) {
268  case FR_CHANNEL_ERROR:
269  return;
270 
271  case FR_CHANNEL_EMPTY:
272  return;
273 
274  case FR_CHANNEL_NOOP:
275  return;
276 
278  fr_assert(0 == 1);
279  break;
280 
282  fr_assert(ch != NULL);
283 
284  if (!fr_channel_recv_request(ch)) {
285  worker->was_sleeping = was_sleeping;
286 
287  } else while (fr_channel_recv_request(ch));
288  break;
289 
290  case FR_CHANNEL_OPEN:
291  fr_assert(ch != NULL);
292 
293  ok = false;
294  for (i = 0; i < worker->config.max_channels; i++) {
295  fr_assert(worker->channel[i].ch != ch);
296 
297  if (worker->channel[i].ch != NULL) continue;
298 
299  worker->channel[i].ch = ch;
300  fr_dlist_init(&worker->channel[i].dlist, fr_async_t, entry);
301 
302  DEBUG3("Received channel %p into array entry %d", ch, i);
303 
304  ms = fr_message_set_create(worker, worker->config.message_set_size,
305  sizeof(fr_channel_data_t),
306  worker->config.ring_buffer_size);
307  fr_assert(ms != NULL);
309 
310  worker->num_channels++;
311  ok = true;
312  break;
313  }
314 
315  fr_cond_assert(ok);
316  break;
317 
318  case FR_CHANNEL_CLOSE:
319  fr_assert(ch != NULL);
320 
321  ok = false;
322 
323  /*
324  * Locate the signalling channel in the list
325  * of channels.
326  */
327  for (i = 0; i < worker->config.max_channels; i++) {
328  if (!worker->channel[i].ch) continue;
329 
330  if (worker->channel[i].ch != ch) continue;
331 
332  worker_requests_cancel(&worker->channel[i]);
333 
335 
337  "Network added messages to channel after sending FR_CHANNEL_CLOSE");
338 
340  fr_assert(ms != NULL);
341  fr_message_set_gc(ms);
342  talloc_free(ms);
343 
344  worker->channel[i].ch = NULL;
345 
346  fr_assert(!fr_dlist_head(&worker->channel[i].dlist)); /* we can't look at num_elements */
347  fr_assert(worker->num_channels > 0);
348 
349  worker->num_channels--;
350  ok = true;
351  break;
352  }
353 
354  fr_cond_assert(ok);
355 
356  /*
357  * Our last input channel closed,
358  * time to die.
359  */
360  if (worker->num_channels == 0) worker_exit(worker);
361  break;
362  }
363 }
364 
366 {
367  fr_worker_listen_t *wl;
368  request_t *request;
369 
370  wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = li });
371  if (!wl) return -1;
372 
373  while ((request = fr_dlist_pop_head(&wl->dlist)) != NULL) {
374  RDEBUG("Canceling request due to socket being closed");
375  unlang_interpret_signal(request, FR_SIGNAL_CANCEL);
376  }
377 
378  (void) fr_rb_delete(worker->listeners, wl);
379  talloc_free(wl);
380 
381  return 0;
382 }
383 
384 
385 /** A socket is going away, so clean up any requests which use this socket.
386  *
387  * @param[in] ctx the worker
388  * @param[in] data the message
389  * @param[in] data_size size of the data
390  * @param[in] now the current time
391  */
392 static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
393 {
394  fr_listen_t const *li;
395  fr_worker_t *worker = ctx;
396 
397  fr_assert(data_size == sizeof(li));
398 
399  memcpy(&li, data, sizeof(li));
400 
401  (void) fr_worker_listen_cancel_self(worker, li);
402 }
403 
404 /** Send a NAK to the network thread
405  *
406  * The network thread believes that a worker is running a request until that request has been NAK'd.
407  * We typically NAK requests when they've been hanging around in the worker's backlog too long,
408  * or there was an error executing the request.
409  *
410  * @param[in] worker the worker
411  * @param[in] cd the message to NAK
412  * @param[in] now when the message is NAKd
413  */
414 static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
415 {
416  size_t size;
417  fr_channel_data_t *reply;
418  fr_channel_t *ch;
419  fr_message_set_t *ms;
420  fr_listen_t *listen;
421 
422  worker->num_naks++;
423 
424  /*
425  * Cache the outbound channel. We'll need it later.
426  */
427  ch = cd->channel.ch;
428  listen = cd->listen;
429 
430  /*
431  * If the channel has been closed, but we haven't
432  * been informed, that is extremely bad.
433  *
434  * Try to continue working... but we'll likely
435  * leak memory or SEGV soon.
436  */
437  if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send NAK but channel has been closed")) {
438  fr_message_done(&cd->m);
439  return;
440  }
441 
443  fr_assert(ms != NULL);
444 
445  size = listen->app_io->default_reply_size;
446  if (!size) size = listen->app_io->default_message_size;
447 
448  /*
449  * Allocate a default message size.
450  */
451  reply = (fr_channel_data_t *) fr_message_reserve(ms, size);
452  fr_assert(reply != NULL);
453 
454  /*
455  * Encode a NAK
456  */
457  if (listen->app_io->nak) {
458  size = listen->app_io->nak(listen, cd->packet_ctx, cd->m.data,
459  cd->m.data_size, reply->m.data, reply->m.rb_size);
460  } else {
461  size = 1; /* rely on them to figure it the heck out */
462  }
463 
464  (void) fr_message_alloc(ms, &reply->m, size);
465 
466  /*
467  * Fill in the NAK.
468  */
469  reply->m.when = now;
470  reply->reply.cpu_time = worker->tracking.running_total;
471  reply->reply.processing_time = fr_time_delta_from_sec(10); /* @todo - set to something better? */
472  reply->reply.request_time = cd->request.recv_time;
473 
474  reply->listen = cd->listen;
475  reply->packet_ctx = cd->packet_ctx;
476 
477  /*
478  * Mark the original message as done.
479  */
480  fr_message_done(&cd->m);
481 
482  /*
483  * Send the reply, which also polls the request queue.
484  */
485  if (fr_channel_send_reply(ch, reply) < 0) {
486  DEBUG2("Failed sending reply to channel");
487  }
488 
489  worker->stats.out++;
490 }
491 
492 /** Signal the unlang interpreter that it needs to stop running the request
493  *
494  * Signalling is a synchronous operation. Whatever I/O requests the request
495  * is currently performing are immediately cancelled, and all the frames are
496  * popped off the unlang stack.
497  *
498  * Modules and unlang keywords explicitly register signal handlers to deal
499  * with their yield points being cancelled/interrupted via this function.
500  *
501  * The caller should assume the request is no longer viable after calling
502  * this function.
503  *
504  * @param[in] request_p Pointer to the request to cancel.
505  * Will be set to NULL.
506  */
507 static void worker_stop_request(request_t **request_p)
508 {
509  /*
510  * Also marks the request as done and runs
511  * the internal/external callbacs.
512  */
513  unlang_interpret_signal(*request_p, FR_SIGNAL_CANCEL);
514  *request_p = NULL;
515 }
516 
517 /** Enforce max_request_time
518  *
519  * Run periodically, and tries to clean up requests which were received by the network
520  * thread more than max_request_time seconds ago. In the interest of not adding a
521  * timer for every packet, the requests are given a 1 second leeway.
522  *
523  * @param[in] el the worker's event list
524  * @param[in] when the current time
525  * @param[in] uctx the fr_worker_t.
526  */
528 {
529  fr_time_t now = fr_time();
530  request_t *request;
531  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
532 
533  /*
534  * Look at the oldest requests, and see if they need to
535  * be deleted.
536  */
537  while ((request = fr_minmax_heap_max_pop(worker->time_order)) != NULL) {
539 
540  REQUEST_VERIFY(request);
541 
542  cleanup = fr_time_add(request->async->recv_time, worker->config.max_request_time);
543  if (fr_time_gt(cleanup, now)) break;
544 
545  /*
546  * Waiting too long, delete it.
547  */
548  REDEBUG("Request has reached max_request_time - signalling it to stop");
549  worker_stop_request(&request);
550  }
551 
552  /*
553  * Reset the max request timer.
554  */
555  worker_max_request_timer(worker);
556 }
557 
558 /** See when we next need to service the time_order heap for "too old" packets
559  *
560  * Inserts a timer into the event list will will trigger when the packet that
561  * was received longest ago, would be older than max_request_time.
562  */
564 {
566  request_t *request;
567 
568  /*
569  * No more requests, delete the timer.
570  */
571  request = fr_minmax_heap_max_peek(worker->time_order);
572  if (!request) return;
573 
574  cleanup = fr_time_add(request->async->recv_time, worker->config.max_request_time);
575 
576  DEBUG2("Resetting cleanup timer to +%pV", fr_box_time_delta(worker->config.max_request_time));
577  if (fr_event_timer_at(worker, worker->el, &worker->ev_cleanup,
578  cleanup, worker_max_request_time, worker) < 0) {
579  ERROR("Failed inserting max_request_time timer");
580  }
581 }
582 
583 /** Start time tracking for a request, and mark it as runnable.
584  *
585  */
587 {
588  /*
589  * New requests are inserted into the time order heap in
590  * strict time priority. Once they are in the list, they
591  * are only removed when the request is done / free'd.
592  */
593  fr_assert(!fr_minmax_heap_entry_inserted(request->time_order_id));
594  (void) fr_minmax_heap_insert(worker->time_order, request);
595 
596  /*
597  * Bootstrap the async state machine with the initial
598  * state of the request.
599  */
600  RDEBUG3("Time tracking started in yielded state");
601  fr_time_tracking_start(&worker->tracking, &request->async->tracking, now);
602  fr_time_tracking_yield(&request->async->tracking, now);
603  worker->num_active++;
604 
605  fr_assert(!fr_heap_entry_inserted(request->runnable_id));
606  (void) fr_heap_insert(&worker->runnable, request);
607 
608  if (!worker->ev_cleanup) worker_max_request_timer(worker);
609 }
610 
612 {
613  RDEBUG3("Time tracking ended");
614  fr_time_tracking_end(&worker->predicted, &request->async->tracking, now);
615  fr_assert(worker->num_active > 0);
616  worker->num_active--;
617 
618  if (fr_minmax_heap_entry_inserted(request->time_order_id)) (void) fr_minmax_heap_extract(worker->time_order, request);
619 }
620 
621 /** Send a response packet to the network side
622  *
623  * @param[in] worker This worker.
624  * @param[in] request we're sending a reply for.
625  * @param[in] send_reply whether the network side sends a reply
626  * @param[in] now The current time
627  */
628 static void worker_send_reply(fr_worker_t *worker, request_t *request, bool send_reply, fr_time_t now)
629 {
630  fr_channel_data_t *reply;
631  fr_channel_t *ch;
632  fr_message_set_t *ms;
633  size_t size = 1;
634 
635  REQUEST_VERIFY(request);
636 
637  /*
638  * If we're sending a reply, then it's no longer runnable.
639  */
640  fr_assert(!fr_heap_entry_inserted(request->runnable_id));
641 
642  if (send_reply) {
643  size = request->async->listen->app_io->default_reply_size;
644  if (!size) size = request->async->listen->app_io->default_message_size;
645  }
646 
647  /*
648  * Allocate and send the reply.
649  */
650  ch = request->async->channel;
651  fr_assert(ch != NULL);
652 
653  /*
654  * If the channel has been closed, but we haven't
655  * been informed, that is extremely bad.
656  *
657  * Try to continue working... but we'll likely
658  * leak memory or SEGV soon.
659  */
660  if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send reply but channel has been closed")) {
661  return;
662  }
663 
665  fr_assert(ms != NULL);
666 
667  reply = (fr_channel_data_t *) fr_message_reserve(ms, size);
668  fr_assert(reply != NULL);
669 
670  /*
671  * Encode it, if required.
672  */
673  if (send_reply) {
674  ssize_t slen = 0;
675  fr_listen_t const *listen = request->async->listen;
676 
677  if (listen->app_io->encode) {
678  slen = listen->app_io->encode(listen->app_io_instance, request,
679  reply->m.data, reply->m.rb_size);
680  } else if (listen->app->encode) {
681  slen = listen->app->encode(listen->app_instance, request,
682  reply->m.data, reply->m.rb_size);
683  }
684  if (slen < 0) {
685  RPERROR("Failed encoding request");
686  *reply->m.data = 0;
687  slen = 1;
688  }
689 
690  /*
691  * Shrink the buffer to the actual packet size.
692  *
693  * This will ALWAYS return the same message as we put in.
694  */
695  fr_assert((size_t) slen <= reply->m.rb_size);
696  (void) fr_message_alloc(ms, &reply->m, slen);
697  }
698 
699  /*
700  * Fill in the rest of the fields in the channel message.
701  *
702  * sequence / ack will be filled in by fr_channel_send_reply()
703  */
704  reply->m.when = now;
705  reply->reply.cpu_time = worker->tracking.running_total;
706  reply->reply.processing_time = request->async->tracking.running_total;
707  reply->reply.request_time = request->async->recv_time;
708 
709  reply->listen = request->async->listen;
710  reply->packet_ctx = request->async->packet_ctx;
711 
712  /*
713  * Update the various timers.
714  */
715  fr_time_elapsed_update(&worker->cpu_time, now, fr_time_add(now, reply->reply.processing_time));
716  fr_time_elapsed_update(&worker->wall_clock, reply->reply.request_time, now);
717 
718  RDEBUG("Finished request");
719 
720  /*
721  * Send the reply, which also polls the request queue.
722  */
723  if (fr_channel_send_reply(ch, reply) < 0) {
724  /*
725  * Should only happen if the TO_REQUESTOR
726  * channel is full, or it's not yet active.
727  *
728  * Not much we can do except complain
729  * loudly and cleanup the request.
730  */
731  RPERROR("Failed sending reply to network thread");
732  }
733 
734  worker->stats.out++;
735 
736  fr_assert(!fr_minmax_heap_entry_inserted(request->time_order_id));
737  fr_assert(!fr_heap_entry_inserted(request->runnable_id));
738 
739  fr_dlist_entry_unlink(&request->listen_entry);
740 
741 #ifndef NDEBUG
742  request->async->el = NULL;
743  request->async->channel = NULL;
744  request->async->packet_ctx = NULL;
745  request->async->listen = NULL;
746 #endif
747 }
748 
749 /*
750  * talloc_typed_asprintf() is horrifically slow for printing
751  * simple numbers.
752  */
753 static char *itoa_internal(TALLOC_CTX *ctx, uint64_t number)
754 {
755  char buffer[32];
756  char *p;
757  char const *numbers = "0123456789";
758 
759  p = buffer + 30;
760  *(p--) = '\0';
761 
762  while (number > 0) {
763  *(p--) = numbers[number % 10];
764  number /= 10;
765  }
766 
767  if (p[1]) return talloc_strdup(ctx, p + 1);
768 
769  return talloc_strdup(ctx, "0");
770 }
771 
772 /** Initialize various request fields needed by the worker.
773  *
774  */
775 static inline CC_HINT(always_inline)
777 {
778  /*
779  * For internal requests request->packet
780  * and request->reply are already populated.
781  */
782  if (!request->packet) MEM(request->packet = fr_packet_alloc(request, false));
783  if (!request->reply) MEM(request->reply = fr_packet_alloc(request, false));
784 
785  request->packet->timestamp = now;
786  request->async = talloc_zero(request, fr_async_t);
787  request->async->recv_time = now;
788  request->async->el = worker->el;
789  fr_dlist_entry_init(&request->async->entry);
790 }
791 
792 static inline CC_HINT(always_inline)
794 {
796  if (request->name) talloc_const_free(request->name);
797  request->name = itoa_internal(request, request->number);
798 }
799 
801 {
802  int ret = -1;
803  request_t *request;
804  TALLOC_CTX *ctx;
805  fr_listen_t const *listen;
806 
807  if (fr_minmax_heap_num_elements(worker->time_order) >= (uint32_t) worker->config.max_requests) goto nak;
808 
809  ctx = request = request_alloc_external(NULL, NULL);
810  if (!request) goto nak;
811 
812  worker_request_init(worker, request, now);
814 
815  /*
816  * Associate our interpreter with the request
817  */
818  unlang_interpret_set(request, worker->intp);
819 
820  request->packet->timestamp = cd->request.recv_time; /* Legacy - Remove once everything looks at request->async */
821 
822  /*
823  * Receive a message to the worker queue, and decode it
824  * to a request.
825  */
826  fr_assert(cd->listen != NULL);
827 
828  /*
829  * Update the transport-specific fields.
830  */
831  request->async->channel = cd->channel.ch;
832 
833  request->async->recv_time = cd->request.recv_time;
834 
835  request->async->listen = cd->listen;
836  request->async->packet_ctx = cd->packet_ctx;
837  listen = request->async->listen;
838 
839  /*
840  * Now that the "request" structure has been initialized, go decode the packet.
841  *
842  * Note that this also sets the "async process" function.
843  */
844  if (listen->app->decode) {
845  ret = listen->app->decode(listen->app_instance, request, cd->m.data, cd->m.data_size);
846  } else if (listen->app_io->decode) {
847  ret = listen->app_io->decode(listen->app_io_instance, request, cd->m.data, cd->m.data_size);
848  }
849 
850  if (ret < 0) {
851  talloc_free(ctx);
852 nak:
853  worker_nak(worker, cd, now);
854  return;
855  }
856 
857  /*
858  * Set the entry point for this virtual server.
859  */
860  if (unlang_call_push(request, cd->listen->server_cs, UNLANG_TOP_FRAME) < 0) {
861  RERROR("Protocol failed to set 'process' function");
862  worker_nak(worker, cd, now);
863  return;
864  }
865 
866  /*
867  * We're done with this message.
868  */
869  fr_message_done(&cd->m);
870 
871  /*
872  * Look for conflicting / duplicate packets, but only if
873  * requested to do so.
874  */
875  if (request->async->listen->track_duplicates) {
876  request_t *old;
877 
878  old = fr_rb_find(worker->dedup, request);
879  if (!old) {
880  goto insert_new;
881  }
882 
883  fr_assert(old->async->listen == request->async->listen);
884  fr_assert(old->async->channel == request->async->channel);
885 
886  /*
887  * There's a new packet. Do we keep the old one,
888  * or the new one? This decision is made by
889  * checking the recv_time, which is a
890  * nanosecond-resolution timer. If the time is
891  * identical, then the new packet is the same as
892  * the old one.
893  *
894  * If the new packet is a duplicate of the old
895  * one, then we can just discard the new one. We
896  * have to tell the channel that we've "eaten"
897  * this reply, so the sequence number should
898  * increase.
899  *
900  * @todo - fix the channel code to do queue
901  * depth, and not sequence / ack.
902  */
903  if (fr_time_eq(old->async->recv_time, request->async->recv_time)) {
904  RWARN("Discarding duplicate of request (%"PRIu64")", old->number);
905 
906  fr_channel_null_reply(request->async->channel);
907  talloc_free(request);
908 
909  /*
910  * Signal there's a dup, and ignore the
911  * return code. We don't bother replying
912  * here, as an FD event or timer will
913  * wake up the request, and cause it to
914  * continue.
915  *
916  * @todo - the old request is NOT
917  * running, but is yielded. It MAY clean
918  * itself up, or do something...
919  */
920  unlang_interpret_signal(old, FR_SIGNAL_DUP);
921  worker->stats.dup++;
922  return;
923  }
924 
925  /*
926  * Stop the old request, and decrement the number
927  * of active requests.
928  */
929  RWARN("Got conflicting packet for request (%" PRIu64 "), telling old request to stop", old->number);
930 
931  worker_stop_request(&old);
932  worker->stats.dropped++;
933 
934  insert_new:
935  (void) fr_rb_insert(worker->dedup, request);
936  }
937 
938  worker_request_time_tracking_start(worker, request, now);
939 
940  {
941  fr_worker_listen_t *wl;
942 
943  wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = listen });
944  if (!wl) {
945  MEM(wl = talloc_zero(worker, fr_worker_listen_t));
946  fr_dlist_init(&wl->dlist, request_t, listen_entry);
947  wl->listener = listen;
948 
949  (void) fr_rb_insert(worker->listeners, wl);
950  }
951 
952  fr_dlist_insert_tail(&wl->dlist, request);
953  }
954 }
955 
956 /**
957  * Track a request_t in the "runnable" heap.
958  */
959 static int8_t worker_runnable_cmp(void const *one, void const *two)
960 {
961  request_t const *a = one, *b = two;
962  int ret;
963 
964  ret = CMP(a->async->priority, b->async->priority);
965  if (ret != 0) return ret;
966 
967  ret = CMP(a->async->sequence, b->async->sequence);
968  if (ret != 0) return ret;
969 
970  return fr_time_cmp(a->async->recv_time, b->async->recv_time);
971 }
972 
973 /**
974  * Track a request_t in the "time_order" heap.
975  */
976 static int8_t worker_time_order_cmp(void const *one, void const *two)
977 {
978  request_t const *a = one, *b = two;
979 
980  return fr_time_cmp(a->async->recv_time, b->async->recv_time);
981 }
982 
983 /**
984  * Track a request_t in the "dedup" tree
985  */
986 static int8_t worker_dedup_cmp(void const *one, void const *two)
987 {
988  int ret;
989  request_t const *a = one, *b = two;
990 
991  ret = CMP(a->async->listen, b->async->listen);
992  if (ret) return ret;
993 
994  return CMP(a->async->packet_ctx, b->async->packet_ctx);
995 }
996 
997 /** Destroy a worker
998  *
999  * The input channels are signaled, and local messages are cleaned up.
1000  *
1001  * This should be called to _EXPLICITLY_ destroy a worker, when some fatal
1002  * error has occurred on the worker side, and we need to destroy it.
1003  *
1004  * We signal all pending requests in the backlog to stop, and tell the
1005  * network side that it should not send us any more requests.
1006  *
1007  * @param[in] worker the worker to destroy.
1008  */
1010 {
1011  int i, count;
1012  request_t *request;
1013 
1014 // WORKER_VERIFY;
1015 
1016  /*
1017  * Stop any new requests running with this interpreter
1018  */
1020 
1021  /*
1022  * Destroy all of the active requests. These are ones
1023  * which are still waiting for timers or file descriptor
1024  * events.
1025  */
1026  count = 0;
1027  while ((request = fr_minmax_heap_min_peek(worker->time_order)) != NULL) {
1028  if (count < 10) {
1029  DEBUG("Worker is exiting - telling request %s to stop", request->name);
1030  count++;
1031  }
1032  worker_stop_request(&request);
1033  }
1034  fr_assert(fr_heap_num_elements(worker->runnable) == 0);
1035 
1036  /*
1037  * Signal the channels that we're closing.
1038  *
1039  * The other end owns the channel, and will take care of
1040  * popping messages in the TO_RESPONDER queue, and marking
1041  * them FR_MESSAGE_DONE. It will ignore the messages in
1042  * the TO_REQUESTOR queue, as we own those. They will be
1043  * automatically freed when our talloc context is freed.
1044  */
1045  for (i = 0; i < worker->config.max_channels; i++) {
1046  if (!worker->channel[i].ch) continue;
1047 
1048  worker_requests_cancel(&worker->channel[i]);
1049 
1051  "Pending messages in channel after cancelling request");
1052 
1054  }
1055 
1056  talloc_free(worker);
1057 }
1058 
1059 /** Internal request (i.e. one generated by the interpreter) is now complete
1060  *
1061  */
1062 static void _worker_request_internal_init(request_t *request, void *uctx)
1063 {
1064  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1065  fr_time_t now = fr_time();
1066 
1067  worker_request_init(worker, request, now);
1068 
1069  /*
1070  * Requests generated by the interpreter
1071  * are always marked up as internal.
1072  */
1073  fr_assert(request_is_internal(request));
1074  worker_request_time_tracking_start(worker, request, now);
1075 }
1076 
1077 
1078 /** External request is now complete
1079  *
1080  */
1081 static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1082 {
1083  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1084  fr_time_t now = fr_time();
1085 
1086  /*
1087  * All external requests MUST have a listener.
1088  */
1089  fr_assert(request_is_external(request));
1090  fr_assert(request->async->listen != NULL);
1091 
1092  /*
1093  * Only real packets are in the dedup tree. And even
1094  * then, only some of the time.
1095  */
1096  if (request->async->listen->track_duplicates) {
1097  (void) fr_rb_delete(worker->dedup, request);
1098  }
1099 
1100  /*
1101  * If we're running a real request, then the final
1102  * indentation MUST be zero. Otherwise we skipped
1103  * something!
1104  *
1105  * Also check that the request is NOT marked as
1106  * "yielded", but is in fact done.
1107  *
1108  * @todo - check that the stack is at frame 0, otherwise
1109  * more things have gone wrong.
1110  */
1111  fr_assert_msg(request_is_internal(request) || request_is_detached(request) || (request->log.indent.unlang == 0),
1112  "Request %s bad log indentation - expected 0 got %u", request->name, request->log.indent.unlang);
1114  "Request %s is marked as yielded at end of processing", request->name);
1116  "Request %s stack depth %u > 0", request->name, unlang_interpret_stack_depth(request));
1117  RDEBUG("Done request");
1118 
1119  /*
1120  * The request is done. Track that.
1121  */
1122  worker_request_time_tracking_end(worker, request, now);
1123 
1124  /*
1125  * Remove it from the list of requests associated with this channel.
1126  */
1127  if (fr_dlist_entry_in_list(&request->async->entry)) {
1128  fr_dlist_entry_unlink(&request->async->entry);
1129  }
1130 
1131  /*
1132  * These conditions are true when the server is
1133  * exiting and we're stopping all the requests.
1134  *
1135  * This should never happen otherwise.
1136  */
1137  if (unlikely((request->master_state == REQUEST_STOP_PROCESSING) &&
1138  !fr_channel_active(request->async->channel))) {
1139  talloc_free(request);
1140  return;
1141  }
1142 
1143  worker_send_reply(worker, request, request->master_state != REQUEST_STOP_PROCESSING, now);
1144  talloc_free(request);
1145 }
1146 
1147 /** Internal request (i.e. one generated by the interpreter) is now complete
1148  *
1149  * Whatever generated the request is now responsible for freeing it.
1150  */
1151 static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1152 {
1153  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1154 
1155  worker_request_time_tracking_end(worker, request, fr_time());
1156 
1157  fr_assert(!fr_heap_entry_inserted(request->runnable_id));
1158  fr_assert(!fr_minmax_heap_entry_inserted(request->time_order_id));
1159  fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1160 }
1161 
1162 /** Detached request (i.e. one generated by the interpreter with no parent) is now complete
1163  *
1164  * As the request has no parent, then there's nothing to free it
1165  * so we have to.
1166  */
1167 static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1168 {
1169  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1170 
1171  /*
1172  * No time tracking for detached requests
1173  * so we don't need to call
1174  * worker_request_time_tracking_end.
1175  */
1176  fr_assert(!fr_heap_entry_inserted(request->runnable_id));
1177 
1178  /*
1179  * Normally worker_request_time_tracking_end
1180  * would remove the request from the time
1181  * order heap, but we need to do that for
1182  * detached requests.
1183  */
1184  (void)fr_minmax_heap_extract(worker->time_order, request);
1185 
1186  fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1187 
1188  /*
1189  * Detached requests have to be freed by us
1190  * as nothing else can free them.
1191  *
1192  * All other requests must be freed by the
1193  * code which allocated them.
1194  */
1195  talloc_free(request);
1196 }
1197 
1198 
1199 /** Make us responsible for running the request
1200  *
1201  */
1202 static void _worker_request_detach(request_t *request, void *uctx)
1203 {
1204  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1205 
1206  if (request_is_detachable(request)) {
1207  /*
1208  * End the time tracking... We don't track detached requests,
1209  * because they don't contribute for the time consumed by an
1210  * external request.
1211  */
1212  if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) {
1213  RDEBUG3("Forcing time tracking to running state, from yielded, for request detach");
1214  fr_time_tracking_resume(&request->async->tracking, fr_time());
1215  }
1216  worker_request_time_tracking_end(worker, request, fr_time());
1217 
1218  if (request_detach(request) < 0) RPEDEBUG("Failed detaching request");
1219 
1220  RDEBUG3("Request is detached");
1221  } else {
1222  fr_assert_msg(0, "Request is not detachable");
1223  }
1224 
1225  return;
1226 }
1227 
1228 /** This is called by the interpreter when it wants to stop a request
1229  *
1230  * The idea is to get the request into the same state it would be in
1231  * if the interpreter had just finished with it.
1232  */
1233 static void _worker_request_stop(request_t *request, void *uctx)
1234 {
1235  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1236 
1237  RDEBUG3("Cleaning up request execution state");
1238 
1239  /*
1240  * Make sure time tracking is always in a
1241  * consistent state when we mark the request
1242  * as done.
1243  */
1244  if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) {
1245  RDEBUG3("Forcing time tracking to running state, from yielded, for request stop");
1246  fr_time_tracking_resume(&request->async->tracking, fr_time());
1247  }
1248 
1249  /*
1250  * If the request is in the runnable queue
1251  * yank it back out, so it's not "runnable"
1252  * when we call request done.
1253  */
1254  if (fr_heap_entry_inserted(request->runnable_id)) fr_heap_extract(&worker->runnable, request);
1255 }
1256 
1257 /** Request is now runnable
1258  *
1259  */
1260 static void _worker_request_runnable(request_t *request, void *uctx)
1261 {
1262  fr_worker_t *worker = uctx;
1263 
1264  RDEBUG3("Request marked as runnable");
1265  fr_heap_insert(&worker->runnable, request);
1266 }
1267 
1268 /** Interpreter yielded request
1269  *
1270  */
1271 static void _worker_request_yield(request_t *request, UNUSED void *uctx)
1272 {
1273  RDEBUG3("Request yielded");
1274  fr_time_tracking_yield(&request->async->tracking, fr_time());
1275 }
1276 
1277 /** Interpreter is starting to work on request again
1278  *
1279  */
1280 static void _worker_request_resume(request_t *request, UNUSED void *uctx)
1281 {
1282  RDEBUG3("Request resuming");
1283  fr_time_tracking_resume(&request->async->tracking, fr_time());
1284 }
1285 
1286 /** Check if a request is scheduled
1287  *
1288  */
1289 static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
1290 {
1291  return fr_heap_entry_inserted(request->runnable_id);
1292 }
1293 
1294 /** Run a request
1295  *
1296  * Until it either yields, or is done.
1297  *
1298  * This function is also responsible for sending replies, and
1299  * cleaning up the request.
1300  *
1301  * @param[in] worker the worker
1302  * @param[in] start the current time
1303  */
1304 static inline CC_HINT(always_inline) void worker_run_request(fr_worker_t *worker, fr_time_t start)
1305 {
1306  request_t *request;
1307  fr_time_t now;
1308 
1309  WORKER_VERIFY;
1310 
1311  now = start;
1312 
1313  /*
1314  * Busy-loop running requests for 1ms. We still poll the
1315  * event loop 1000 times a second, OR when there's no
1316  * more work to do. This allows us to make progress with
1317  * ongoing requests, at the expense of sometimes ignoring
1318  * new ones.
1319  */
1320  while (fr_time_delta_lt(fr_time_sub(now, start), fr_time_delta_from_msec(1)) &&
1321  ((request = fr_heap_pop(&worker->runnable)) != NULL)) {
1322 
1323  REQUEST_VERIFY(request);
1324  fr_assert(!fr_heap_entry_inserted(request->runnable_id));
1325 
1326  /*
1327  * For real requests, if the channel is gone,
1328  * just stop the request and free it.
1329  */
1330  if (request->async->channel && !fr_channel_active(request->async->channel)) {
1331  worker_stop_request(&request);
1332  return;
1333  }
1334 
1335  (void)unlang_interpret(request);
1336 
1337  now = fr_time();
1338  }
1339 }
1340 
1341 /** Create a worker
1342  *
1343  * @param[in] ctx the talloc context
1344  * @param[in] name the name of this worker
1345  * @param[in] el the event list
1346  * @param[in] logger the destination for all logging messages
1347  * @param[in] lvl log level
1348  * @param[in] config various configuration parameters
1349  * @return
1350  * - NULL on error
1351  * - fr_worker_t on success
1352  */
1353 fr_worker_t *fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl,
1355 {
1356  fr_worker_t *worker;
1357 
1358  worker = talloc_zero(ctx, fr_worker_t);
1359  if (!worker) {
1360 nomem:
1361  fr_strerror_const("Failed allocating memory");
1362  return NULL;
1363  }
1364 
1365  worker->name = talloc_strdup(worker, name); /* thread locality */
1366 
1367  unlang_thread_instantiate(worker);
1368 
1369  if (config) worker->config = *config;
1370 
1371 #define CHECK_CONFIG(_x, _min, _max) do { \
1372  if (!worker->config._x) worker->config._x = _min; \
1373  if (worker->config._x < _min) worker->config._x = _min; \
1374  if (worker->config._x > _max) worker->config._x = _max; \
1375  } while (0)
1376 
1377 #define CHECK_CONFIG_TIME_DELTA(_x, _min, _max) do { \
1378  if (fr_time_delta_lt(worker->config._x, _min)) worker->config._x = _min; \
1379  if (fr_time_delta_gt(worker->config._x, _max)) worker->config._x = _max; \
1380  } while (0)
1381 
1382  CHECK_CONFIG(max_requests,1024,(1 << 30));
1383  CHECK_CONFIG(max_channels, 64, 1024);
1384  CHECK_CONFIG(talloc_pool_size, 4096, 65536);
1385  CHECK_CONFIG(message_set_size, 1024, 8192);
1386  CHECK_CONFIG(ring_buffer_size, (1 << 17), (1 << 20));
1388 
1389  worker->channel = talloc_zero_array(worker, fr_worker_channel_t, worker->config.max_channels);
1390  if (!worker->channel) {
1391  talloc_free(worker);
1392  goto nomem;
1393  }
1394 
1395  worker->thread_id = pthread_self();
1396  worker->el = el;
1397  worker->log = logger;
1398  worker->lvl = lvl;
1399 
1400  /*
1401  * The worker thread starts now. Manually initialize it,
1402  * because we're tracking request time, not the time that
1403  * the worker thread is running.
1404  */
1405  memset(&worker->tracking, 0, sizeof(worker->tracking));
1406 
1407  worker->aq_control = fr_atomic_queue_alloc(worker, 1024);
1408  if (!worker->aq_control) {
1409  fr_strerror_const("Failed creating atomic queue");
1410  fail:
1411  talloc_free(worker);
1412  return NULL;
1413  }
1414 
1415  worker->control = fr_control_create(worker, el, worker->aq_control);
1416  if (!worker->control) {
1417  fr_strerror_const_push("Failed creating control plane");
1418  goto fail;
1419  }
1420 
1422  fr_strerror_const_push("Failed adding control channel");
1423  goto fail;
1424  }
1425 
1427  fr_strerror_const_push("Failed adding callback for listeners");
1428  goto fail;
1429  }
1430 
1431  worker->runnable = fr_heap_talloc_alloc(worker, worker_runnable_cmp, request_t, runnable_id, 0);
1432  if (!worker->runnable) {
1433  fr_strerror_const("Failed creating runnable heap");
1434  goto fail;
1435  }
1436 
1437  worker->time_order = fr_minmax_heap_talloc_alloc(worker, worker_time_order_cmp, request_t, time_order_id, 0);
1438  if (!worker->time_order) {
1439  fr_strerror_const("Failed creating time_order heap");
1440  goto fail;
1441  }
1442 
1443  worker->dedup = fr_rb_inline_talloc_alloc(worker, request_t, dedup_node, worker_dedup_cmp, NULL);
1444  if (!worker->dedup) {
1445  fr_strerror_const("Failed creating de_dup tree");
1446  goto fail;
1447  }
1448 
1450  if (!worker->listeners) {
1451  fr_strerror_const("Failed creating listener tree");
1452  goto fail;
1453  }
1454 
1455  worker->intp = unlang_interpret_init(worker, el,
1457  .init_internal = _worker_request_internal_init,
1458 
1459  .done_external = _worker_request_done_external,
1460  .done_internal = _worker_request_done_internal,
1461  .done_detached = _worker_request_done_detached,
1462 
1463  .detach = _worker_request_detach,
1464  .stop = _worker_request_stop,
1465  .yield = _worker_request_yield,
1466  .resume = _worker_request_resume,
1467  .mark_runnable = _worker_request_runnable,
1468 
1469  .scheduled = _worker_request_scheduled
1470  },
1471  worker);
1472  if (!worker->intp){
1473  fr_strerror_const("Failed initialising interpreter");
1474  goto fail;
1475  }
1477 
1478  return worker;
1479 }
1480 
1481 
1482 /** The main loop and entry point of the stand-alone worker thread.
1483  *
1484  * Where there is only one thread, the event loop runs fr_worker_pre_event() and fr_worker_post_event()
1485  * instead, And then fr_worker_post_event() takes care of calling worker_run_request() to actually run the
1486  * request.
1487  *
1488  * @param[in] worker the worker data structure to manage
1489  */
1490 void fr_worker(fr_worker_t *worker)
1491 {
1492  WORKER_VERIFY;
1493 
1494  while (true) {
1495  bool wait_for_event;
1496  int num_events;
1497 
1498  WORKER_VERIFY;
1499 
1500  /*
1501  * There are runnable requests. We still service
1502  * the event loop, but we don't wait for events.
1503  */
1504  wait_for_event = (fr_heap_num_elements(worker->runnable) == 0);
1505  if (wait_for_event) {
1506  if (worker->exiting && (fr_minmax_heap_num_elements(worker->time_order) == 0)) break;
1507 
1508  DEBUG4("Ready to process requests");
1509  }
1510 
1511  /*
1512  * Check the event list. If there's an error
1513  * (e.g. exit), we stop looping and clean up.
1514  */
1515  DEBUG3("Gathering events - %s", wait_for_event ? "will wait" : "Will not wait");
1516  num_events = fr_event_corral(worker->el, fr_time(), wait_for_event);
1517  if (num_events < 0) {
1518  PERROR("Failed retrieving events");
1519  break;
1520  }
1521 
1522  DEBUG3("%u event(s) pending%s",
1523  num_events == -1 ? 0 : num_events, num_events == -1 ? " - event loop exiting" : "");
1524 
1525  /*
1526  * Service outstanding events.
1527  */
1528  if (num_events > 0) {
1529  DEBUG4("Servicing event(s)");
1530  fr_event_service(worker->el);
1531  }
1532 
1533  /*
1534  * Run any outstanding requests.
1535  */
1536  worker_run_request(worker, fr_time());
1537  }
1538 }
1539 
1540 /** Pre-event handler
1541  *
1542  * This should be run ONLY in single-threaded mode!
1543  */
1545 {
1546  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1547  request_t *request;
1548 
1549  request = fr_heap_peek(worker->runnable);
1550  if (!request) return 0;
1551 
1552  /*
1553  * There's work to do. Tell the event handler to poll
1554  * for IO / timers, but also immediately return to the
1555  * calling function, which has more work to do.
1556  */
1557  return 1;
1558 }
1559 
1560 
1561 /** Post-event handler
1562  *
1563  * This should be run ONLY in single-threaded mode!
1564  */
1566 {
1567  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1568 
1569  worker_run_request(worker, fr_time()); /* Event loop time can be too old, and trigger asserts */
1570 }
1571 
1572 /** Print debug information about the worker structure
1573  *
1574  * @param[in] worker the worker
1575  * @param[in] fp the file where the debug output is printed.
1576  */
1577 void fr_worker_debug(fr_worker_t *worker, FILE *fp)
1578 {
1579  WORKER_VERIFY;
1580 
1581  fprintf(fp, "\tnum_channels = %d\n", worker->num_channels);
1582  fprintf(fp, "\tstats.in = %" PRIu64 "\n", worker->stats.in);
1583 
1584  fprintf(fp, "\tcalculated (predicted) total CPU time = %" PRIu64 "\n",
1585  fr_time_delta_unwrap(worker->predicted) * worker->stats.in);
1586  fprintf(fp, "\tcalculated (counted) per request time = %" PRIu64 "\n",
1587  fr_time_delta_unwrap(worker->tracking.running_total) / worker->stats.in);
1588 
1589  fr_time_tracking_debug(&worker->tracking, fp);
1590 
1591 }
1592 
1593 /** Create a channel to the worker
1594  *
1595  * Called by the master (i.e. network) thread when it needs to create
1596  * a new channel to a particuler worker.
1597  *
1598  * @param[in] worker the worker
1599  * @param[in] master the control plane of the master
1600  * @param[in] ctx the context in which the channel will be created
1601  */
1603 {
1604  fr_channel_t *ch;
1605  pthread_t id;
1606  bool same;
1607 
1608  WORKER_VERIFY;
1609 
1610  id = pthread_self();
1611  same = (pthread_equal(id, worker->thread_id) != 0);
1612 
1613  ch = fr_channel_create(ctx, master, worker->control, same);
1614  if (!ch) return NULL;
1615 
1617 
1618  /*
1619  * Tell the worker about the channel
1620  */
1621  if (fr_channel_signal_open(ch) < 0) {
1622  talloc_free(ch);
1623  return NULL;
1624  }
1625 
1626  return ch;
1627 }
1628 
1630 {
1632 
1633  /*
1634  * Skip a bunch of work if we're already in the worker thread.
1635  */
1636  if (is_worker_thread(worker)) {
1637  return fr_worker_listen_cancel_self(worker, li);
1638  }
1639 
1640  rb = fr_worker_rb_init();
1641  if (!rb) return -1;
1642 
1643  return fr_control_message_send(worker->control, rb, FR_CONTROL_ID_LISTEN, &li, sizeof(li));
1644 }
1645 
1646 #ifdef WITH_VERIFY_PTR
1647 /** Verify the worker data structures.
1648  *
1649  * @param[in] worker the worker
1650  */
1651 static void worker_verify(fr_worker_t *worker)
1652 {
1653  int i;
1654 
1655  (void) talloc_get_type_abort(worker, fr_worker_t);
1656  fr_atomic_queue_verify(worker->aq_control);
1657 
1658  fr_assert(worker->control != NULL);
1659  (void) talloc_get_type_abort(worker->control, fr_control_t);
1660 
1661  fr_assert(worker->el != NULL);
1662  (void) talloc_get_type_abort(worker->el, fr_event_list_t);
1663 
1664  fr_assert(worker->runnable != NULL);
1665  (void) talloc_get_type_abort(worker->runnable, fr_heap_t);
1666 
1667  fr_assert(worker->dedup != NULL);
1668  (void) talloc_get_type_abort(worker->dedup, fr_rb_tree_t);
1669 
1670  for (i = 0; i < worker->config.max_channels; i++) {
1671  if (!worker->channel[i].ch) continue;
1672 
1673  (void) talloc_get_type_abort(worker->channel[i].ch, fr_channel_t);
1674  }
1675 }
1676 #endif
1677 
1678 int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
1679 {
1680  if (num < 0) return -1;
1681  if (num == 0) return 0;
1682 
1683  stats[0] = worker->stats.in;
1684  if (num >= 2) stats[1] = worker->stats.out;
1685  if (num >= 3) stats[2] = worker->stats.dup;
1686  if (num >= 4) stats[3] = worker->stats.dropped;
1687  if (num >= 5) stats[4] = worker->num_naks;
1688  if (num >= 6) stats[5] = worker->num_active;
1689 
1690  if (num <= 6) return num;
1691 
1692  return 6;
1693 }
1694 
1695 static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
1696 {
1697  fr_worker_t const *worker = ctx;
1698  fr_time_delta_t when;
1699 
1700  if ((info->argc == 0) || (strcmp(info->argv[0], "count") == 0)) {
1701  fprintf(fp, "count.in\t\t\t%" PRIu64 "\n", worker->stats.in);
1702  fprintf(fp, "count.out\t\t\t%" PRIu64 "\n", worker->stats.out);
1703  fprintf(fp, "count.dup\t\t\t%" PRIu64 "\n", worker->stats.dup);
1704  fprintf(fp, "count.dropped\t\t\t%" PRIu64 "\n", worker->stats.dropped);
1705  fprintf(fp, "count.naks\t\t\t%" PRIu64 "\n", worker->num_naks);
1706  fprintf(fp, "count.active\t\t\t%" PRIu64 "\n", worker->num_active);
1707  fprintf(fp, "count.runnable\t\t\t%u\n", fr_heap_num_elements(worker->runnable));
1708  }
1709 
1710  if ((info->argc == 0) || (strcmp(info->argv[0], "cpu") == 0)) {
1711  when = worker->predicted;
1712  fprintf(fp, "cpu.request_time_rtt\t\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1713 
1714  when = worker->tracking.running_total;
1715  if (fr_time_delta_ispos(when)) when = fr_time_delta_div(when, fr_time_delta_wrap(worker->stats.in - worker->stats.dropped));
1716  fprintf(fp, "cpu.average_request_time\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1717 
1718  when = worker->tracking.running_total;
1719  fprintf(fp, "cpu.used\t\t\t%.6f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1720 
1721  when = worker->tracking.waiting_total;
1722  fprintf(fp, "cpu.waiting\t\t\t%.3f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1723 
1724  fr_time_elapsed_fprint(fp, &worker->cpu_time, "cpu.requests", 4);
1725  fr_time_elapsed_fprint(fp, &worker->wall_clock, "time.requests", 4);
1726  }
1727 
1728  return 0;
1729 }
1730 
1732  {
1733  .parent = "stats",
1734  .name = "worker",
1735  .help = "Statistics for workers threads.",
1736  .read_only = true
1737  },
1738 
1739  {
1740  .parent = "stats worker",
1741  .add_name = true,
1742  .name = "self",
1743  .syntax = "[(count|cpu)]",
1744  .func = cmd_stats_worker,
1745  .help = "Show statistics for a specific worker thread.",
1746  .read_only = true
1747  },
1748 
1750 };
static int const char char buffer[256]
Definition: acutest.h:574
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
Definition: app_io.h:55
size_t default_reply_size
same for replies
Definition: app_io.h:40
size_t default_message_size
Usually maximum message size.
Definition: app_io.h:39
fr_io_nak_t nak
Function to send a NAK.
Definition: app_io.h:62
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
Definition: app_io.h:54
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
Definition: application.h:80
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
Definition: application.h:85
#define fr_atexit_thread_local(_name, _free, _uctx)
Definition: atexit.h:221
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Definition: atomic_queue.c:80
Structure to hold the atomic queue.
Definition: atomic_queue.c:54
#define atomic_uint64_t
Definition: atomic_queue.h:42
#define RCSID(id)
Definition: build.h:444
#define NDEBUG_UNUSED
Definition: build.h:324
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition: build.h:110
#define unlikely(_x)
Definition: build.h:378
#define UNUSED
Definition: build.h:313
unlang_action_t unlang_call_push(request_t *request, CONF_SECTION *server_cs, bool top_frame)
Push a call frame onto the stack.
Definition: call.c:147
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
Definition: channel.c:183
fr_table_num_sorted_t const channel_signals[]
Definition: channel.c:153
void * fr_channel_responder_uctx_get(fr_channel_t *ch)
Get responder-specific data from a channel.
Definition: channel.c:897
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition: channel.c:685
int fr_channel_set_recv_request(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_request)
Definition: channel.c:938
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
Definition: channel.c:472
int fr_channel_null_reply(fr_channel_t *ch)
Don't send a reply message into the channel.
Definition: channel.c:624
void fr_channel_responder_uctx_add(fr_channel_t *ch, void *uctx)
Add responder-specific data to a channel.
Definition: channel.c:885
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
Definition: channel.c:511
bool fr_channel_active(fr_channel_t *ch)
Check if a channel is active.
Definition: channel.c:812
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
Definition: channel.c:854
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
Definition: channel.c:952
A full channel, which consists of two ends.
Definition: channel.c:144
fr_message_t m
the message header
Definition: channel.h:105
fr_channel_event_t
Definition: channel.h:67
@ FR_CHANNEL_NOOP
Definition: channel.h:74
@ FR_CHANNEL_EMPTY
Definition: channel.h:75
@ FR_CHANNEL_CLOSE
Definition: channel.h:72
@ FR_CHANNEL_ERROR
Definition: channel.h:68
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition: channel.h:70
@ FR_CHANNEL_OPEN
Definition: channel.h:71
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition: channel.h:69
void * packet_ctx
Packet specific context for holding client information, and other proto_* specific information that n...
Definition: channel.h:142
fr_listen_t * listen
for tracking packet transport, etc.
Definition: channel.h:146
Channel information which is added to a message.
Definition: channel.h:104
int argc
current argument count
Definition: command.h:39
char const * parent
e.g. "show module"
Definition: command.h:52
#define CMD_TABLE_END
Definition: command.h:62
char const ** argv
text version of commands
Definition: command.h:42
#define FR_CONTROL_ID_LISTEN_DEAD
Definition: control.h:61
#define FR_CONTROL_ID_CHANNEL
Definition: control.h:56
#define FR_CONTROL_ID_LISTEN
Definition: control.h:57
#define FR_CONTROL_MAX_SIZE
Definition: control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition: control.h:50
static fr_ring_buffer_t * rb
Definition: control_test.c:51
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:137
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition: debug.h:208
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:154
#define ERROR(fmt,...)
Definition: dhcpclient.c:41
#define DEBUG(fmt,...)
Definition: dhcpclient.c:39
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition: dlist.h:260
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition: dlist.h:672
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition: dlist.h:163
static void fr_dlist_entry_unlink(fr_dlist_t *entry)
Remove an item from the dlist when we don't have access to the head.
Definition: dlist.h:146
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition: dlist.h:939
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition: dlist.h:486
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition: dlist.h:378
static void fr_dlist_entry_init(fr_dlist_t *entry)
Initialise a linked list without metadata.
Definition: dlist.h:138
Head of a doubly linked list.
Definition: dlist.h:51
#define fr_event_timer_at(...)
Definition: event.h:250
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
Definition: heap.c:322
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition: heap.c:146
int fr_heap_extract(fr_heap_t **hp, void *data)
Remove a node from the heap.
Definition: heap.c:239
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition: heap.h:136
static bool fr_heap_entry_inserted(fr_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
Definition: heap.h:124
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition: heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition: heap.h:115
The main heap structure.
Definition: heap.h:66
rlm_rcode_t unlang_interpret(request_t *request)
Run the interpreter for a current request.
Definition: interpret.c:760
void unlang_interpret_set(request_t *request, unlang_interpret_t *intp)
Set a specific interpreter for a request.
Definition: interpret.c:1726
int unlang_interpret_stack_depth(request_t *request)
Return the depth of the request's stack.
Definition: interpret.c:1263
void unlang_interpret_set_thread_default(unlang_interpret_t *intp)
Set the default interpreter for this thread.
Definition: interpret.c:1757
unlang_interpret_t * unlang_interpret_init(TALLOC_CTX *ctx, fr_event_list_t *el, unlang_request_func_t *funcs, void *uctx)
Initialize a unlang compiler / interpret.
Definition: interpret.c:1684
void unlang_interpret_signal(request_t *request, fr_signal_t action)
Send a signal (usually stop) to a request.
Definition: interpret.c:1184
bool unlang_interpret_is_resumable(request_t *request)
Check if a request as resumable.
Definition: interpret.c:1322
#define UNLANG_TOP_FRAME
Definition: interpret.h:35
External functions provided by the owner of the interpret.
Definition: interpret.h:100
uint64_t out
Definition: base.h:43
uint64_t dup
Definition: base.h:44
uint64_t dropped
Definition: base.h:45
uint64_t in
Definition: base.h:42
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition: control.c:149
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition: control.c:417
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition: control.c:343
The control structure.
Definition: control.c:79
void const * app_instance
Definition: listen.h:38
fr_app_t const * app
Definition: listen.h:37
void const * app_io_instance
I/O path configuration context.
Definition: listen.h:32
CONF_SECTION * server_cs
CONF_SECTION of the server.
Definition: listen.h:40
fr_app_io_t const * app_io
I/O path functions.
Definition: listen.h:31
Minimal data structure to use the new code.
Definition: listen.h:58
#define PERROR(_fmt,...)
Definition: log.h:228
#define DEBUG3(_fmt,...)
Definition: log.h:266
#define RDEBUG3(fmt,...)
Definition: log.h:343
#define RWARN(fmt,...)
Definition: log.h:297
#define RERROR(fmt,...)
Definition: log.h:298
#define DEBUG4(_fmt,...)
Definition: log.h:267
#define RPERROR(fmt,...)
Definition: log.h:302
#define RPEDEBUG(fmt,...)
Definition: log.h:376
int fr_event_post_delete(fr_event_list_t *el, fr_event_timer_cb_t callback, void *uctx)
Delete a post-event callback from the event list.
Definition: event.c:2328
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition: event.c:2542
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition: event.c:2407
talloc_free(reap)
Stores all information relating to an event list.
Definition: event.c:411
A timer event.
Definition: event.c:102
fr_log_lvl_t
Definition: log.h:67
fr_packet_t * fr_packet_alloc(TALLOC_CTX *ctx, bool new_vector)
Allocate a new fr_packet_t.
Definition: packet.c:38
unsigned int uint32_t
Definition: merged_model.c:33
long int ssize_t
Definition: merged_model.c:24
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
Definition: message.c:127
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition: message.c:190
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition: message.c:934
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition: message.c:1238
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition: message.c:988
A Message set, composed of message headers and ring buffer data.
Definition: message.c:95
size_t rb_size
cache-aligned size in the ring buffer
Definition: message.h:51
fr_time_t when
when this message was sent
Definition: message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition: message.h:49
size_t data_size
size of the data in the ring buffer
Definition: message.h:50
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
Definition: minmax_heap.c:424
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
Definition: minmax_heap.c:449
void * fr_minmax_heap_max_pop(fr_minmax_heap_t *hp)
Definition: minmax_heap.c:477
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
Definition: minmax_heap.c:466
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
Definition: minmax_heap.c:533
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
Definition: minmax_heap.c:486
static bool fr_minmax_heap_entry_inserted(fr_minmax_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
Definition: minmax_heap.h:93
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
Definition: minmax_heap.h:85
static const conf_parser_t config[]
Definition: base.c:188
static rc_stats_t stats
Definition: radclient-ng.c:72
#define REDEBUG(fmt,...)
Definition: radclient.h:52
#define RDEBUG(fmt,...)
Definition: radclient.h:53
#define DEBUG2(fmt,...)
Definition: radclient.h:43
static void send_reply(int sockfd, fr_channel_data_t *reply)
Definition: radius1_test.c:190
static bool cleanup
Definition: radsniff.c:60
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition: rb.c:624
bool fr_rb_delete(fr_rb_tree_t *tree, void const *data)
Remove node and free data (if a free function was specified)
Definition: rb.c:736
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition: rb.c:576
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
Definition: rb.h:246
The main red black tree structure.
Definition: rb.h:73
rlm_rcode_t
Return codes indicating the result of the module call.
Definition: rcode.h:40
int request_detach(request_t *child)
Unlink a subrequest from its parent.
Definition: request.c:664
#define REQUEST_VERIFY(_x)
Definition: request.h:275
#define request_is_detached(_x)
Definition: request.h:160
#define request_is_external(_x)
Definition: request.h:158
#define request_is_internal(_x)
Definition: request.h:159
#define request_is_detachable(_x)
Definition: request.h:161
#define request_alloc_external(_ctx, _args)
Allocate a new external request.
Definition: request.h:294
@ REQUEST_STOP_PROCESSING
Definition: request.h:62
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition: ring_buffer.c:64
static char const * name
return count
Definition: module.c:175
fr_assert(0)
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
@ memory_order_seq_cst
Definition: stdatomic.h:132
#define atomic_fetch_add_explicit(object, operand, order)
Definition: stdatomic.h:302
Definition: log.h:96
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition: table.h:253
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition: talloc.h:212
void fr_time_elapsed_update(fr_time_elapsed_t *elapsed, fr_time_t start, fr_time_t end)
Definition: time.c:580
void fr_time_elapsed_fprint(FILE *fp, fr_time_elapsed_t const *elapsed, char const *prefix, int tab_offset)
Definition: time.c:625
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition: time.h:573
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
Definition: time.h:154
#define fr_time_delta_lt(_a, _b)
Definition: time.h:283
static fr_time_delta_t fr_time_delta_from_sec(int64_t sec)
Definition: time.h:588
#define fr_time_delta_wrap(_time)
Definition: time.h:152
#define fr_time_delta_ispos(_a)
Definition: time.h:288
#define fr_time_eq(_a, _b)
Definition: time.h:241
#define NSEC
Definition: time.h:377
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition: time.h:196
#define fr_time_gt(_a, _b)
Definition: time.h:237
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition: time.h:229
static fr_time_delta_t fr_time_delta_div(fr_time_delta_t a, fr_time_delta_t b)
Definition: time.h:267
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition: time.h:914
A time delta, a difference in time measured in nanoseconds.
Definition: time.h:80
"server local" time.
Definition: time.h:69
@ FR_TIME_TRACKING_YIELDED
We're currently tracking time in the yielded state.
Definition: time_tracking.h:44
static void fr_time_tracking_yield(fr_time_tracking_t *tt, fr_time_t now)
Transition to the yielded state, recording the time we just spent running.
static void fr_time_tracking_end(fr_time_delta_t *predicted, fr_time_tracking_t *tt, fr_time_t now)
End time tracking for this entity.
fr_time_delta_t waiting_total
total time spent waiting
Definition: time_tracking.h:80
fr_time_delta_t running_total
total time spent running
Definition: time_tracking.h:79
static void fr_time_tracking_start(fr_time_tracking_t *parent, fr_time_tracking_t *tt, fr_time_t now)
Start time tracking for a tracked entity.
static void fr_time_tracking_resume(fr_time_tracking_t *tt, fr_time_t now)
Track that a request resumed.
static void fr_time_tracking_debug(fr_time_tracking_t *tt, FILE *fp)
Print debug information about the time tracking structure.
static fr_event_list_t * el
int unlang_thread_instantiate(TALLOC_CTX *ctx)
Create thread-specific data structures for unlang.
Definition: compile.c:4965
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
Definition: strerror.c:733
#define fr_strerror_const_push(_msg)
Definition: strerror.h:227
#define fr_strerror_const(_msg)
Definition: strerror.h:223
static fr_slen_t data
Definition: value.h:1259
#define fr_box_time_delta(_val)
Definition: value.h:336
static int8_t worker_time_order_cmp(void const *one, void const *two)
Track a request_t in the "time_order" heap.
Definition: worker.c:976
static char * itoa_internal(TALLOC_CTX *ctx, uint64_t number)
Definition: worker.c:753
fr_heap_t * runnable
current runnable requests which we've spent time processing
Definition: worker.c:113
static void worker_request_time_tracking_end(fr_worker_t *worker, request_t *request, fr_time_t now)
Definition: worker.c:611
static void _worker_request_yield(request_t *request, UNUSED void *uctx)
Interpreter yielded request.
Definition: worker.c:1271
static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Detached request (i.e.
Definition: worker.c:1167
static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
Handle a control plane message sent to the worker via a channel.
Definition: worker.c:248
fr_event_list_t * el
our event list
Definition: worker.c:109
int fr_worker_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
Pre-event handler.
Definition: worker.c:1544
static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now)
Send a response packet to the network side.
Definition: worker.c:628
fr_rb_tree_t * listeners
so we can cancel requests when a listener goes away
Definition: worker.c:117
fr_channel_t * ch
Definition: worker.c:82
static void worker_run_request(fr_worker_t *worker, fr_time_t start)
Run a request.
Definition: worker.c:1304
static void worker_exit(fr_worker_t *worker)
Definition: worker.c:226
#define WORKER_VERIFY
Definition: worker.c:73
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
Definition: worker.c:1602
bool was_sleeping
used to suppress multiple sleep signals in a row
Definition: worker.c:129
static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
Definition: worker.c:1695
static void _worker_request_runnable(request_t *request, void *uctx)
Request is now runnable.
Definition: worker.c:1260
static int8_t worker_dedup_cmp(void const *one, void const *two)
Track a request_t in the "dedup" tree.
Definition: worker.c:986
static void worker_stop_request(request_t **request_p)
Signal the unlang interpreter that it needs to stop running the request.
Definition: worker.c:507
fr_worker_channel_t * channel
list of channels
Definition: worker.c:136
char const * name
name of this worker
Definition: worker.c:95
uint64_t num_active
number of active requests
Definition: worker.c:124
fr_event_timer_t const * ev_cleanup
timer for max_request_time
Definition: worker.c:134
fr_cmd_table_t cmd_worker_table[]
Definition: worker.c:1731
static _Thread_local fr_ring_buffer_t * fr_worker_rb
Definition: worker.c:79
fr_minmax_heap_t * time_order
time ordered heap of requests
Definition: worker.c:114
int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
Definition: worker.c:1678
static void worker_request_time_tracking_start(fr_worker_t *worker, request_t *request, fr_time_t now)
Start time tracking for a request, and mark it as runnable.
Definition: worker.c:586
fr_dlist_head_t dlist
Definition: worker.c:88
static void _worker_request_resume(request_t *request, UNUSED void *uctx)
Interpreter is starting to work on request again.
Definition: worker.c:1280
fr_rb_tree_t * dedup
de-dup tree
Definition: worker.c:115
fr_atomic_queue_t * aq_control
atomic queue for control messages sent to me
Definition: worker.c:105
static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Send a NAK to the network thread.
Definition: worker.c:414
static void worker_request_name_number(request_t *request)
Definition: worker.c:793
static void _worker_request_stop(request_t *request, void *uctx)
This is called by the interpreter when it wants to stop a request.
Definition: worker.c:1233
static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Definition: worker.c:800
fr_log_t const * log
log destination
Definition: worker.c:102
fr_io_stats_t stats
input / output stats
Definition: worker.c:119
#define CHECK_CONFIG(_x, _min, _max)
static void _worker_request_detach(request_t *request, void *uctx)
Make us responsible for running the request.
Definition: worker.c:1202
static int _fr_worker_rb_free(void *arg)
Definition: worker.c:164
fr_time_tracking_t tracking
how much time the worker has spent doing things.
Definition: worker.c:127
static fr_ring_buffer_t * fr_worker_rb_init(void)
Initialise thread local storage.
Definition: worker.c:173
static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
External request is now complete.
Definition: worker.c:1081
void fr_worker_destroy(fr_worker_t *worker)
Destroy a worker.
Definition: worker.c:1009
uint64_t num_naks
number of messages which were nak'd
Definition: worker.c:123
static void worker_request_init(fr_worker_t *worker, request_t *request, fr_time_t now)
Initialize various request fields needed by the worker.
Definition: worker.c:776
fr_worker_config_t config
external configuration
Definition: worker.c:96
fr_listen_t const * listener
incoming packets
Definition: worker.c:140
unlang_interpret_t * intp
Worker's local interpreter.
Definition: worker.c:98
fr_time_t checked_timeout
when we last checked the tails of the queues
Definition: worker.c:132
static int fr_worker_listen_cancel_self(fr_worker_t *worker, fr_listen_t const *li)
Definition: worker.c:365
bool exiting
are we exiting?
Definition: worker.c:130
fr_log_lvl_t lvl
log level
Definition: worker.c:103
static void worker_max_request_timer(fr_worker_t *worker)
See when we next need to service the time_order heap for "too old" packets.
Definition: worker.c:563
static void worker_requests_cancel(fr_worker_channel_t *ch)
Definition: worker.c:217
int num_channels
actual number of channels
Definition: worker.c:111
fr_worker_t * fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config)
Create a worker.
Definition: worker.c:1353
static atomic_uint64_t request_number
Definition: worker.c:77
fr_time_elapsed_t cpu_time
histogram of total CPU time per request
Definition: worker.c:120
fr_rb_node_t node
in tree of listeners
Definition: worker.c:142
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
Definition: worker.c:1629
void fr_worker_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Post-event handler.
Definition: worker.c:1565
fr_dlist_head_t dlist
of requests associated with this listener.
Definition: worker.c:148
void fr_worker(fr_worker_t *worker)
The main loop and entry point of the stand-alone worker thread.
Definition: worker.c:1490
fr_time_delta_t predicted
How long we predict a request will take to execute.
Definition: worker.c:126
pthread_t thread_id
my thread ID
Definition: worker.c:100
static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Callback which handles a message being received on the worker side.
Definition: worker.c:207
fr_time_elapsed_t wall_clock
histogram of wall clock time per request
Definition: worker.c:121
static int8_t worker_listener_cmp(void const *one, void const *two)
Definition: worker.c:152
static void worker_max_request_time(UNUSED fr_event_list_t *el, UNUSED fr_time_t when, void *uctx)
Enforce max_request_time.
Definition: worker.c:527
static bool is_worker_thread(fr_worker_t const *worker)
Definition: worker.c:191
fr_control_t * control
the control plane
Definition: worker.c:107
static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
Check if a request is scheduled.
Definition: worker.c:1289
static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Internal request (i.e.
Definition: worker.c:1151
void fr_worker_debug(fr_worker_t *worker, FILE *fp)
Print debug information about the worker structure.
Definition: worker.c:1577
static void _worker_request_internal_init(request_t *request, void *uctx)
Internal request (i.e.
Definition: worker.c:1062
#define CACHE_LINE_SIZE
Definition: worker.c:76
#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max)
static int8_t worker_runnable_cmp(void const *one, void const *two)
Track a request_t in the "runnable" heap.
Definition: worker.c:959
static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
A socket is going away, so clean up any requests which use this socket.
Definition: worker.c:392
A worker which takes packets from a master, and processes them.
Definition: worker.c:94
int message_set_size
default start number of messages
Definition: worker.h:63
int max_requests
max requests this worker will handle
Definition: worker.h:59
int max_channels
maximum number of channels
Definition: worker.h:61
int ring_buffer_size
default start size for the ring buffers
Definition: worker.h:64
fr_time_delta_t max_request_time
maximum time a request can be processed
Definition: worker.h:66