The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
worker.c
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or
5  * (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17  /**
18  * $Id: eadb77c5c36479ee4283ee4647cc92cde9ff0736 $
19  *
20  * @brief Worker thread functions.
21  * @file io/worker.c
22  *
23  * The "worker" thread is the one responsible for the bulk of the
24  * work done when processing a request. Workers are spawned by the
25  * scheduler, and create a kqueue (KQ) and control-plane
26  * Atomic Queue (AQ) for control-plane communication.
27  *
28  * When a network thread discovers that it needs more workers, it
29  * asks the scheduler for a KQ/AQ combination. The network thread
30  * then creates a channel dedicated to that worker, and sends the
31  * channel to the worker in a "new channel" message. The worker
32  * receives the channel, and sends an ACK back to the network thread.
33  *
34  * The network thread then sends the worker new packets, which the
35  * worker receives and processes.
36  *
37  * When a packet is decoded, it is put into the "runnable" heap, and
38  * also into the "time_order" heap. The main loop fr_worker() then
39  * pulls new requests off of this heap and runs them. The
40  * worker_check_timeouts() function also checks the tail of the
41  * "time_order" heap, and ages out requests which have been active
42  * for "too long".
43  *
44  * If a request is yielded, it is placed onto the yielded list in
45  * the worker "tracking" data structure.
46  *
47  * @copyright 2016 Alan DeKok (aland@freeradius.org)
48  */
49 
50 RCSID("$Id: eadb77c5c36479ee4283ee4647cc92cde9ff0736 $")
51 
52 #define LOG_PREFIX worker->name
53 #define LOG_DST worker->log
54 
55 #include <freeradius-devel/io/channel.h>
56 #include <freeradius-devel/io/listen.h>
57 #include <freeradius-devel/io/message.h>
58 #include <freeradius-devel/io/worker.h>
59 #include <freeradius-devel/unlang/base.h>
60 #include <freeradius-devel/unlang/call.h>
61 #include <freeradius-devel/unlang/interpret.h>
62 #include <freeradius-devel/server/request.h>
63 #include <freeradius-devel/server/time_tracking.h>
64 #include <freeradius-devel/util/dlist.h>
65 #include <freeradius-devel/util/minmax_heap.h>
66 
67 #include <stdalign.h>
68 
69 #ifdef WITH_VERIFY_PTR
70 static void worker_verify(fr_worker_t *worker);
71 #define WORKER_VERIFY worker_verify(worker)
72 #else
73 #define WORKER_VERIFY
74 #endif
75 
76 #define CACHE_LINE_SIZE 64
78 
79 static _Thread_local fr_ring_buffer_t *fr_worker_rb;
80 
81 typedef struct {
83 
84  /*
85  * To save time, we don't care about num_elements here. Which means that we don't
86  * need to cache or lookup the fr_worker_listen_t when we free a request.
87  */
90 
91 /**
92  * A worker which takes packets from a master, and processes them.
93  */
94 struct fr_worker_s {
95  char const *name; //!< name of this worker
96  fr_worker_config_t config; //!< external configuration
97 
98  unlang_interpret_t *intp; //!< Worker's local interpreter.
99 
100  pthread_t thread_id; //!< my thread ID
101 
102  fr_log_t const *log; //!< log destination
103  fr_log_lvl_t lvl; //!< log level
104 
105  fr_atomic_queue_t *aq_control; //!< atomic queue for control messages sent to me
106 
107  fr_control_t *control; //!< the control plane
108 
109  fr_event_list_t *el; //!< our event list
110 
111  int num_channels; //!< actual number of channels
112 
113  fr_heap_t *runnable; //!< current runnable requests which we've spent time processing
114  fr_minmax_heap_t *time_order; //!< time ordered heap of requests
115  fr_rb_tree_t *dedup; //!< de-dup tree
116 
117  fr_rb_tree_t *listeners; //!< so we can cancel requests when a listener goes away
118 
119  fr_io_stats_t stats; //!< input / output stats
120  fr_time_elapsed_t cpu_time; //!< histogram of total CPU time per request
121  fr_time_elapsed_t wall_clock; //!< histogram of wall clock time per request
122 
123  uint64_t num_naks; //!< number of messages which were nak'd
124  uint64_t num_active; //!< number of active requests
125 
126  fr_time_delta_t predicted; //!< How long we predict a request will take to execute.
127  fr_time_tracking_t tracking; //!< how much time the worker has spent doing things.
128 
129  bool was_sleeping; //!< used to suppress multiple sleep signals in a row
130  bool exiting; //!< are we exiting?
131 
132  fr_time_t checked_timeout; //!< when we last checked the tails of the queues
133 
134  fr_event_timer_t const *ev_cleanup; //!< timer for max_request_time
135 
136  fr_worker_channel_t *channel; //!< list of channels
137 };
138 
139 typedef struct {
140  fr_listen_t const *listener; //!< incoming packets
141 
142  fr_rb_node_t node; //!< in tree of listeners
143 
144  /*
145  * To save time, we don't care about num_elements here. Which means that we don't
146  * need to cache or lookup the fr_worker_listen_t when we free a request.
147  */
148  fr_dlist_head_t dlist; //!< of requests associated with this listener.
150 
151 
152 static int8_t worker_listener_cmp(void const *one, void const *two)
153 {
154  fr_worker_listen_t const *a = one, *b = two;
155 
156  return CMP(a->listener, b->listener);
157 }
158 
159 
160 /*
161  * Explicitly cleanup the memory allocated to the ring buffer,
162  * just in case valgrind complains about it.
163  */
164 static int _fr_worker_rb_free(void *arg)
165 {
166  return talloc_free(arg);
167 }
168 
169 /** Initialise thread local storage
170  *
171  * @return fr_ring_buffer_t for messages
172  */
174 {
176 
177  rb = fr_worker_rb;
178  if (rb) return rb;
179 
181  if (!rb) {
182  fr_perror("Failed allocating memory for worker ring buffer");
183  return NULL;
184  }
185 
187 
188  return rb;
189 }
190 
191 static inline bool is_worker_thread(fr_worker_t const *worker)
192 {
193  return (pthread_equal(pthread_self(), worker->thread_id) != 0);
194 }
195 
196 static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now);
197 static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now);
199 static void worker_max_request_timer(fr_worker_t *worker);
200 
201 /** Callback which handles a message being received on the worker side.
202  *
203  * @param[in] ctx the worker
204  * @param[in] ch the channel to drain
205  * @param[in] cd the message (if any) to start with
206  */
207 static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
208 {
209  fr_worker_t *worker = ctx;
210 
211  worker->stats.in++;
212  DEBUG3("Received request %" PRIu64 "", worker->stats.in);
213  cd->channel.ch = ch;
214  worker_request_bootstrap(worker, cd, fr_time());
215 }
216 
218 {
219  request_t *request;
220 
221  while ((request = fr_dlist_pop_head(&ch->dlist)) != NULL) {
222  unlang_interpret_signal(request, FR_SIGNAL_CANCEL);
223  }
224 }
225 
226 static void worker_exit(fr_worker_t *worker)
227 {
228  worker->exiting = true;
229 
230  /*
231  * Don't allow the post event to run
232  * any more requests. They'll be
233  * signalled to stop before we exit.
234  *
235  * This only has an effect in single
236  * threaded mode.
237  */
238  (void)fr_event_post_delete(worker->el, fr_worker_post_event, worker);
239 }
240 
241 /** Handle a control plane message sent to the worker via a channel
242  *
243  * @param[in] ctx the worker
244  * @param[in] data the message
245  * @param[in] data_size size of the data
246  * @param[in] now the current time
247  */
248 static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
249 {
250  int i;
251  bool ok, was_sleeping;
252  fr_channel_t *ch;
253  fr_message_set_t *ms;
255  fr_worker_t *worker = ctx;
256 
257  was_sleeping = worker->was_sleeping;
258  worker->was_sleeping = false;
259 
260  /*
261  * We were woken up by a signal to do something. We're
262  * not sleeping.
263  */
264  ce = fr_channel_service_message(now, &ch, data, data_size);
265  DEBUG3("Channel %s",
266  fr_table_str_by_value(channel_signals, ce, "<INVALID>"));
267  switch (ce) {
268  case FR_CHANNEL_ERROR:
269  return;
270 
271  case FR_CHANNEL_EMPTY:
272  return;
273 
274  case FR_CHANNEL_NOOP:
275  return;
276 
278  fr_assert(0 == 1);
279  break;
280 
282  fr_assert(ch != NULL);
283 
284  if (!fr_channel_recv_request(ch)) {
285  worker->was_sleeping = was_sleeping;
286 
287  } else while (fr_channel_recv_request(ch));
288  break;
289 
290  case FR_CHANNEL_OPEN:
291  fr_assert(ch != NULL);
292 
293  ok = false;
294  for (i = 0; i < worker->config.max_channels; i++) {
295  fr_assert(worker->channel[i].ch != ch);
296 
297  if (worker->channel[i].ch != NULL) continue;
298 
299  worker->channel[i].ch = ch;
300  fr_dlist_init(&worker->channel[i].dlist, fr_async_t, entry);
301 
302  DEBUG3("Received channel %p into array entry %d", ch, i);
303 
304  ms = fr_message_set_create(worker, worker->config.message_set_size,
305  sizeof(fr_channel_data_t),
306  worker->config.ring_buffer_size);
307  fr_assert(ms != NULL);
309 
310  worker->num_channels++;
311  ok = true;
312  break;
313  }
314 
315  fr_cond_assert(ok);
316  break;
317 
318  case FR_CHANNEL_CLOSE:
319  fr_assert(ch != NULL);
320 
321  ok = false;
322 
323  /*
324  * Locate the signalling channel in the list
325  * of channels.
326  */
327  for (i = 0; i < worker->config.max_channels; i++) {
328  if (!worker->channel[i].ch) continue;
329 
330  if (worker->channel[i].ch != ch) continue;
331 
332  worker_requests_cancel(&worker->channel[i]);
333 
335 
337  "Network added messages to channel after sending FR_CHANNEL_CLOSE");
338 
340  fr_assert(ms != NULL);
341  fr_message_set_gc(ms);
342  talloc_free(ms);
343 
344  worker->channel[i].ch = NULL;
345 
346  fr_assert(!fr_dlist_head(&worker->channel[i].dlist)); /* we can't look at num_elements */
347  fr_assert(worker->num_channels > 0);
348 
349  worker->num_channels--;
350  ok = true;
351  break;
352  }
353 
354  fr_cond_assert(ok);
355 
356  /*
357  * Our last input channel closed,
358  * time to die.
359  */
360  if (worker->num_channels == 0) worker_exit(worker);
361  break;
362  }
363 }
364 
366 {
367  fr_worker_listen_t *wl;
368  request_t *request;
369 
370  wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = li });
371  if (!wl) return -1;
372 
373  while ((request = fr_dlist_pop_head(&wl->dlist)) != NULL) {
374  RDEBUG("Canceling request due to socket being closed");
375  unlang_interpret_signal(request, FR_SIGNAL_CANCEL);
376  }
377 
378  (void) fr_rb_delete(worker->listeners, wl);
379  talloc_free(wl);
380 
381  return 0;
382 }
383 
384 
385 /** A socket is going away, so clean up any requests which use this socket.
386  *
387  * @param[in] ctx the worker
388  * @param[in] data the message
389  * @param[in] data_size size of the data
390  * @param[in] now the current time
391  */
392 static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
393 {
394  fr_listen_t const *li;
395  fr_worker_t *worker = ctx;
396 
397  fr_assert(data_size == sizeof(li));
398 
399  memcpy(&li, data, sizeof(li));
400 
401  (void) fr_worker_listen_cancel_self(worker, li);
402 }
403 
404 /** Send a NAK to the network thread
405  *
406  * The network thread believes that a worker is running a request until that request has been NAK'd.
407  * We typically NAK requests when they've been hanging around in the worker's backlog too long,
408  * or there was an error executing the request.
409  *
410  * @param[in] worker the worker
411  * @param[in] cd the message to NAK
412  * @param[in] now when the message is NAKd
413  */
414 static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
415 {
416  size_t size;
417  fr_channel_data_t *reply;
418  fr_channel_t *ch;
419  fr_message_set_t *ms;
420  fr_listen_t *listen;
421 
422  worker->num_naks++;
423 
424  /*
425  * Cache the outbound channel. We'll need it later.
426  */
427  ch = cd->channel.ch;
428  listen = cd->listen;
429 
430  /*
431  * If the channel has been closed, but we haven't
432  * been informed, that is extremely bad.
433  *
434  * Try to continue working... but we'll likely
435  * leak memory or SEGV soon.
436  */
437  if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send NAK but channel has been closed")) {
438  fr_message_done(&cd->m);
439  return;
440  }
441 
443  fr_assert(ms != NULL);
444 
445  size = listen->app_io->default_reply_size;
446  if (!size) size = listen->app_io->default_message_size;
447 
448  /*
449  * Allocate a default message size.
450  */
451  reply = (fr_channel_data_t *) fr_message_reserve(ms, size);
452  fr_assert(reply != NULL);
453 
454  /*
455  * Encode a NAK
456  */
457  if (listen->app_io->nak) {
458  size = listen->app_io->nak(listen, cd->packet_ctx, cd->m.data,
459  cd->m.data_size, reply->m.data, reply->m.rb_size);
460  } else {
461  size = 1; /* rely on them to figure it the heck out */
462  }
463 
464  (void) fr_message_alloc(ms, &reply->m, size);
465 
466  /*
467  * Fill in the NAK.
468  */
469  reply->m.when = now;
470  reply->reply.cpu_time = worker->tracking.running_total;
471  reply->reply.processing_time = fr_time_delta_from_sec(10); /* @todo - set to something better? */
472  reply->reply.request_time = cd->request.recv_time;
473 
474  reply->listen = cd->listen;
475  reply->packet_ctx = cd->packet_ctx;
476 
477  /*
478  * Mark the original message as done.
479  */
480  fr_message_done(&cd->m);
481 
482  /*
483  * Send the reply, which also polls the request queue.
484  */
485  if (fr_channel_send_reply(ch, reply) < 0) {
486  DEBUG2("Failed sending reply to channel");
487  }
488 
489  worker->stats.out++;
490 }
491 
492 /** Signal the unlang interpreter that it needs to stop running the request
493  *
494  * Signalling is a synchronous operation. Whatever I/O requests the request
495  * is currently performing are immediately cancelled, and all the frames are
496  * popped off the unlang stack.
497  *
498  * Modules and unlang keywords explicitly register signal handlers to deal
499  * with their yield points being cancelled/interrupted via this function.
500  *
501  * The caller should assume the request is no longer viable after calling
502  * this function.
503  *
504  * @param[in] request_p Pointer to the request to cancel.
505  * Will be set to NULL.
506  */
507 static void worker_stop_request(request_t **request_p)
508 {
509  /*
510  * Also marks the request as done and runs
511  * the internal/external callbacs.
512  */
513  unlang_interpret_signal(*request_p, FR_SIGNAL_CANCEL);
514  *request_p = NULL;
515 }
516 
517 /** Enforce max_request_time
518  *
519  * Run periodically, and tries to clean up requests which were received by the network
520  * thread more than max_request_time seconds ago. In the interest of not adding a
521  * timer for every packet, the requests are given a 1 second leeway.
522  *
523  * @param[in] el the worker's event list
524  * @param[in] when the current time
525  * @param[in] uctx the fr_worker_t.
526  */
528 {
529  fr_time_t now = fr_time();
530  request_t *request;
531  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
532 
533  /*
534  * Look at the oldest requests, and see if they need to
535  * be deleted.
536  */
537  while ((request = fr_minmax_heap_min_peek(worker->time_order)) != NULL) {
539 
540  REQUEST_VERIFY(request);
541 
542  cleanup = fr_time_add(request->async->recv_time, worker->config.max_request_time);
543  if (fr_time_gt(cleanup, now)) break;
544 
545  /*
546  * Waiting too long, delete it.
547  */
548  REDEBUG("Request has reached max_request_time - signalling it to stop");
549  (void) fr_minmax_heap_extract(worker->time_order, request);
550  worker_stop_request(&request);
551  }
552 
553  /*
554  * Reset the max request timer.
555  */
556  worker_max_request_timer(worker);
557 }
558 
559 /** See when we next need to service the time_order heap for "too old" packets
560  *
561  * Inserts a timer into the event list will will trigger when the packet that
562  * was received longest ago, would be older than max_request_time.
563  */
565 {
567  request_t *request;
568 
569  /*
570  * No more requests, delete the timer.
571  */
572  request = fr_minmax_heap_max_peek(worker->time_order);
573  if (!request) return;
574 
575  cleanup = fr_time_add(request->async->recv_time, worker->config.max_request_time);
576 
577  DEBUG2("Resetting cleanup timer to +%pV", fr_box_time_delta(worker->config.max_request_time));
578  if (fr_event_timer_at(worker, worker->el, &worker->ev_cleanup,
579  cleanup, worker_max_request_time, worker) < 0) {
580  ERROR("Failed inserting max_request_time timer");
581  }
582 }
583 
584 /** Start time tracking for a request, and mark it as runnable.
585  *
586  */
588 {
589  /*
590  * New requests are inserted into the time order heap in
591  * strict time priority. Once they are in the list, they
592  * are only removed when the request is done / free'd.
593  */
594  fr_assert(!fr_minmax_heap_entry_inserted(request->time_order_id));
595  (void) fr_minmax_heap_insert(worker->time_order, request);
596 
597  /*
598  * Bootstrap the async state machine with the initial
599  * state of the request.
600  */
601  RDEBUG3("Time tracking started in yielded state");
602  fr_time_tracking_start(&worker->tracking, &request->async->tracking, now);
603  fr_time_tracking_yield(&request->async->tracking, now);
604  worker->num_active++;
605 
606  fr_assert(!fr_heap_entry_inserted(request->runnable_id));
607  (void) fr_heap_insert(&worker->runnable, request);
608 
609  if (!worker->ev_cleanup) worker_max_request_timer(worker);
610 }
611 
613 {
614  RDEBUG3("Time tracking ended");
615  fr_time_tracking_end(&worker->predicted, &request->async->tracking, now);
616  fr_assert(worker->num_active > 0);
617  worker->num_active--;
618 
619  if (fr_minmax_heap_entry_inserted(request->time_order_id)) (void) fr_minmax_heap_extract(worker->time_order, request);
620 }
621 
622 /** Send a response packet to the network side
623  *
624  * @param[in] worker This worker.
625  * @param[in] request we're sending a reply for.
626  * @param[in] send_reply whether the network side sends a reply
627  * @param[in] now The current time
628  */
629 static void worker_send_reply(fr_worker_t *worker, request_t *request, bool send_reply, fr_time_t now)
630 {
631  fr_channel_data_t *reply;
632  fr_channel_t *ch;
633  fr_message_set_t *ms;
634  size_t size = 1;
635 
636  REQUEST_VERIFY(request);
637 
638  /*
639  * If we're sending a reply, then it's no longer runnable.
640  */
641  fr_assert(!fr_heap_entry_inserted(request->runnable_id));
642 
643  if (send_reply) {
644  size = request->async->listen->app_io->default_reply_size;
645  if (!size) size = request->async->listen->app_io->default_message_size;
646  }
647 
648  /*
649  * Allocate and send the reply.
650  */
651  ch = request->async->channel;
652  fr_assert(ch != NULL);
653 
654  /*
655  * If the channel has been closed, but we haven't
656  * been informed, that is extremely bad.
657  *
658  * Try to continue working... but we'll likely
659  * leak memory or SEGV soon.
660  */
661  if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send reply but channel has been closed")) {
662  return;
663  }
664 
666  fr_assert(ms != NULL);
667 
668  reply = (fr_channel_data_t *) fr_message_reserve(ms, size);
669  fr_assert(reply != NULL);
670 
671  /*
672  * Encode it, if required.
673  */
674  if (send_reply) {
675  ssize_t slen = 0;
676  fr_listen_t const *listen = request->async->listen;
677 
678  if (listen->app_io->encode) {
679  slen = listen->app_io->encode(listen->app_io_instance, request,
680  reply->m.data, reply->m.rb_size);
681  } else if (listen->app->encode) {
682  slen = listen->app->encode(listen->app_instance, request,
683  reply->m.data, reply->m.rb_size);
684  }
685  if (slen < 0) {
686  RPERROR("Failed encoding request");
687  *reply->m.data = 0;
688  slen = 1;
689  }
690 
691  /*
692  * Shrink the buffer to the actual packet size.
693  *
694  * This will ALWAYS return the same message as we put in.
695  */
696  fr_assert((size_t) slen <= reply->m.rb_size);
697  (void) fr_message_alloc(ms, &reply->m, slen);
698  }
699 
700  /*
701  * Fill in the rest of the fields in the channel message.
702  *
703  * sequence / ack will be filled in by fr_channel_send_reply()
704  */
705  reply->m.when = now;
706  reply->reply.cpu_time = worker->tracking.running_total;
707  reply->reply.processing_time = request->async->tracking.running_total;
708  reply->reply.request_time = request->async->recv_time;
709 
710  reply->listen = request->async->listen;
711  reply->packet_ctx = request->async->packet_ctx;
712 
713  /*
714  * Update the various timers.
715  */
716  fr_time_elapsed_update(&worker->cpu_time, now, fr_time_add(now, reply->reply.processing_time));
717  fr_time_elapsed_update(&worker->wall_clock, reply->reply.request_time, now);
718 
719  RDEBUG("Finished request");
720 
721  /*
722  * Send the reply, which also polls the request queue.
723  */
724  if (fr_channel_send_reply(ch, reply) < 0) {
725  /*
726  * Should only happen if the TO_REQUESTOR
727  * channel is full, or it's not yet active.
728  *
729  * Not much we can do except complain
730  * loudly and cleanup the request.
731  */
732  RPERROR("Failed sending reply to network thread");
733  }
734 
735  worker->stats.out++;
736 
737  fr_assert(!fr_minmax_heap_entry_inserted(request->time_order_id));
738  fr_assert(!fr_heap_entry_inserted(request->runnable_id));
739 
740  fr_dlist_entry_unlink(&request->listen_entry);
741 
742 #ifndef NDEBUG
743  request->async->el = NULL;
744  request->async->channel = NULL;
745  request->async->packet_ctx = NULL;
746  request->async->listen = NULL;
747 #endif
748 }
749 
750 /*
751  * talloc_typed_asprintf() is horrifically slow for printing
752  * simple numbers.
753  */
754 static char *itoa_internal(TALLOC_CTX *ctx, uint64_t number)
755 {
756  char buffer[32];
757  char *p;
758  char const *numbers = "0123456789";
759 
760  p = buffer + 30;
761  *(p--) = '\0';
762 
763  while (number > 0) {
764  *(p--) = numbers[number % 10];
765  number /= 10;
766  }
767 
768  if (p[1]) return talloc_strdup(ctx, p + 1);
769 
770  return talloc_strdup(ctx, "0");
771 }
772 
773 /** Initialize various request fields needed by the worker.
774  *
775  */
776 static inline CC_HINT(always_inline)
778 {
779  /*
780  * For internal requests request->packet
781  * and request->reply are already populated.
782  */
783  if (!request->packet) MEM(request->packet = fr_packet_alloc(request, false));
784  if (!request->reply) MEM(request->reply = fr_packet_alloc(request, false));
785 
786  request->packet->timestamp = now;
787  request->async = talloc_zero(request, fr_async_t);
788  request->async->recv_time = now;
789  request->async->el = worker->el;
790  fr_dlist_entry_init(&request->async->entry);
791 }
792 
793 static inline CC_HINT(always_inline)
795 {
797  if (request->name) talloc_const_free(request->name);
798  request->name = itoa_internal(request, request->number);
799 }
800 
802 {
803  int ret = -1;
804  request_t *request;
805  TALLOC_CTX *ctx;
806  fr_listen_t const *listen;
807 
808  if (fr_minmax_heap_num_elements(worker->time_order) >= (uint32_t) worker->config.max_requests) goto nak;
809 
810  ctx = request = request_alloc_external(NULL, NULL);
811  if (!request) goto nak;
812 
813  worker_request_init(worker, request, now);
815 
816  /*
817  * Associate our interpreter with the request
818  */
819  unlang_interpret_set(request, worker->intp);
820 
821  request->packet->timestamp = cd->request.recv_time; /* Legacy - Remove once everything looks at request->async */
822 
823  /*
824  * Receive a message to the worker queue, and decode it
825  * to a request.
826  */
827  fr_assert(cd->listen != NULL);
828 
829  /*
830  * Update the transport-specific fields.
831  */
832  request->async->channel = cd->channel.ch;
833 
834  request->async->recv_time = cd->request.recv_time;
835 
836  request->async->listen = cd->listen;
837  request->async->packet_ctx = cd->packet_ctx;
838  request->async->priority = cd->priority;
839  listen = request->async->listen;
840 
841  /*
842  * Now that the "request" structure has been initialized, go decode the packet.
843  *
844  * Note that this also sets the "async process" function.
845  */
846  if (listen->app->decode) {
847  ret = listen->app->decode(listen->app_instance, request, cd->m.data, cd->m.data_size);
848  } else if (listen->app_io->decode) {
849  ret = listen->app_io->decode(listen->app_io_instance, request, cd->m.data, cd->m.data_size);
850  }
851 
852  if (ret < 0) {
853  talloc_free(ctx);
854 nak:
855  worker_nak(worker, cd, now);
856  return;
857  }
858 
859  /*
860  * Set the entry point for this virtual server.
861  */
862  if (unlang_call_push(request, cd->listen->server_cs, UNLANG_TOP_FRAME) < 0) {
863  RERROR("Protocol failed to set 'process' function");
864  worker_nak(worker, cd, now);
865  return;
866  }
867 
868  /*
869  * We're done with this message.
870  */
871  fr_message_done(&cd->m);
872 
873  /*
874  * Look for conflicting / duplicate packets, but only if
875  * requested to do so.
876  */
877  if (request->async->listen->track_duplicates) {
878  request_t *old;
879 
880  old = fr_rb_find(worker->dedup, request);
881  if (!old) {
882  goto insert_new;
883  }
884 
885  fr_assert(old->async->listen == request->async->listen);
886  fr_assert(old->async->channel == request->async->channel);
887 
888  /*
889  * There's a new packet. Do we keep the old one,
890  * or the new one? This decision is made by
891  * checking the recv_time, which is a
892  * nanosecond-resolution timer. If the time is
893  * identical, then the new packet is the same as
894  * the old one.
895  *
896  * If the new packet is a duplicate of the old
897  * one, then we can just discard the new one. We
898  * have to tell the channel that we've "eaten"
899  * this reply, so the sequence number should
900  * increase.
901  *
902  * @todo - fix the channel code to do queue
903  * depth, and not sequence / ack.
904  */
905  if (fr_time_eq(old->async->recv_time, request->async->recv_time)) {
906  RWARN("Discarding duplicate of request (%"PRIu64")", old->number);
907 
908  fr_channel_null_reply(request->async->channel);
909  talloc_free(request);
910 
911  /*
912  * Signal there's a dup, and ignore the
913  * return code. We don't bother replying
914  * here, as an FD event or timer will
915  * wake up the request, and cause it to
916  * continue.
917  *
918  * @todo - the old request is NOT
919  * running, but is yielded. It MAY clean
920  * itself up, or do something...
921  */
922  unlang_interpret_signal(old, FR_SIGNAL_DUP);
923  worker->stats.dup++;
924  return;
925  }
926 
927  /*
928  * Stop the old request, and decrement the number
929  * of active requests.
930  */
931  RWARN("Got conflicting packet for request (%" PRIu64 "), telling old request to stop", old->number);
932 
933  worker_stop_request(&old);
934  worker->stats.dropped++;
935 
936  insert_new:
937  (void) fr_rb_insert(worker->dedup, request);
938  }
939 
940  worker_request_time_tracking_start(worker, request, now);
941 
942  {
943  fr_worker_listen_t *wl;
944 
945  wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = listen });
946  if (!wl) {
947  MEM(wl = talloc_zero(worker, fr_worker_listen_t));
948  fr_dlist_init(&wl->dlist, request_t, listen_entry);
949  wl->listener = listen;
950 
951  (void) fr_rb_insert(worker->listeners, wl);
952  }
953 
954  fr_dlist_insert_tail(&wl->dlist, request);
955  }
956 }
957 
958 /**
959  * Track a request_t in the "runnable" heap.
960  * Higher priorities take precedence, followed by lower sequence numbers
961  */
962 static int8_t worker_runnable_cmp(void const *one, void const *two)
963 {
964  request_t const *a = one, *b = two;
965  int ret;
966 
967  ret = CMP(b->async->priority, a->async->priority);
968  if (ret != 0) return ret;
969 
970  ret = CMP(a->async->sequence, b->async->sequence);
971  if (ret != 0) return ret;
972 
973  return fr_time_cmp(a->async->recv_time, b->async->recv_time);
974 }
975 
976 /**
977  * Track a request_t in the "time_order" heap.
978  */
979 static int8_t worker_time_order_cmp(void const *one, void const *two)
980 {
981  request_t const *a = one, *b = two;
982 
983  return fr_time_cmp(a->async->recv_time, b->async->recv_time);
984 }
985 
986 /**
987  * Track a request_t in the "dedup" tree
988  */
989 static int8_t worker_dedup_cmp(void const *one, void const *two)
990 {
991  int ret;
992  request_t const *a = one, *b = two;
993 
994  ret = CMP(a->async->listen, b->async->listen);
995  if (ret) return ret;
996 
997  return CMP(a->async->packet_ctx, b->async->packet_ctx);
998 }
999 
1000 /** Destroy a worker
1001  *
1002  * The input channels are signaled, and local messages are cleaned up.
1003  *
1004  * This should be called to _EXPLICITLY_ destroy a worker, when some fatal
1005  * error has occurred on the worker side, and we need to destroy it.
1006  *
1007  * We signal all pending requests in the backlog to stop, and tell the
1008  * network side that it should not send us any more requests.
1009  *
1010  * @param[in] worker the worker to destroy.
1011  */
1013 {
1014  int i, count;
1015  request_t *request;
1016 
1017 // WORKER_VERIFY;
1018 
1019  /*
1020  * Stop any new requests running with this interpreter
1021  */
1023 
1024  /*
1025  * Destroy all of the active requests. These are ones
1026  * which are still waiting for timers or file descriptor
1027  * events.
1028  */
1029  count = 0;
1030  while ((request = fr_minmax_heap_min_peek(worker->time_order)) != NULL) {
1031  if (count < 10) {
1032  DEBUG("Worker is exiting - telling request %s to stop", request->name);
1033  count++;
1034  }
1035  worker_stop_request(&request);
1036  }
1037  fr_assert(fr_heap_num_elements(worker->runnable) == 0);
1038 
1039  /*
1040  * Signal the channels that we're closing.
1041  *
1042  * The other end owns the channel, and will take care of
1043  * popping messages in the TO_RESPONDER queue, and marking
1044  * them FR_MESSAGE_DONE. It will ignore the messages in
1045  * the TO_REQUESTOR queue, as we own those. They will be
1046  * automatically freed when our talloc context is freed.
1047  */
1048  for (i = 0; i < worker->config.max_channels; i++) {
1049  if (!worker->channel[i].ch) continue;
1050 
1051  worker_requests_cancel(&worker->channel[i]);
1052 
1054  "Pending messages in channel after cancelling request");
1055 
1057  }
1058 
1059  talloc_free(worker);
1060 }
1061 
1062 /** Internal request (i.e. one generated by the interpreter) is now complete
1063  *
1064  */
1065 static void _worker_request_internal_init(request_t *request, void *uctx)
1066 {
1067  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1068  fr_time_t now = fr_time();
1069 
1070  worker_request_init(worker, request, now);
1071 
1072  /*
1073  * Requests generated by the interpreter
1074  * are always marked up as internal.
1075  */
1076  fr_assert(request_is_internal(request));
1077  worker_request_time_tracking_start(worker, request, now);
1078 }
1079 
1080 
1081 /** External request is now complete
1082  *
1083  */
1085 {
1086  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1087  fr_time_t now = fr_time();
1088 
1089  /*
1090  * All external requests MUST have a listener.
1091  */
1092  fr_assert(request_is_external(request));
1093  fr_assert(request->async->listen != NULL);
1094 
1095  /*
1096  * Only real packets are in the dedup tree. And even
1097  * then, only some of the time.
1098  */
1099  if (request->async->listen->track_duplicates) {
1100  (void) fr_rb_delete(worker->dedup, request);
1101  }
1102 
1103  /*
1104  * If we're running a real request, then the final
1105  * indentation MUST be zero. Otherwise we skipped
1106  * something!
1107  *
1108  * Also check that the request is NOT marked as
1109  * "yielded", but is in fact done.
1110  *
1111  * @todo - check that the stack is at frame 0, otherwise
1112  * more things have gone wrong.
1113  */
1114  fr_assert_msg(request_is_internal(request) || request_is_detached(request) || (request->log.indent.unlang == 0),
1115  "Request %s bad log indentation - expected 0 got %u", request->name, request->log.indent.unlang);
1117  "Request %s is marked as yielded at end of processing", request->name);
1119  "Request %s stack depth %u > 0", request->name, unlang_interpret_stack_depth(request));
1120  RDEBUG("Done request");
1121 
1122  /*
1123  * The request is done. Track that.
1124  */
1125  worker_request_time_tracking_end(worker, request, now);
1126 
1127  /*
1128  * Remove it from the list of requests associated with this channel.
1129  */
1130  if (fr_dlist_entry_in_list(&request->async->entry)) {
1131  fr_dlist_entry_unlink(&request->async->entry);
1132  }
1133 
1134  /*
1135  * These conditions are true when the server is
1136  * exiting and we're stopping all the requests.
1137  *
1138  * This should never happen otherwise.
1139  */
1140  if (unlikely((request->master_state == REQUEST_STOP_PROCESSING) &&
1141  !fr_channel_active(request->async->channel))) {
1142  talloc_free(request);
1143  return;
1144  }
1145 
1146  worker_send_reply(worker, request, request->master_state != REQUEST_STOP_PROCESSING, now);
1147  talloc_free(request);
1148 }
1149 
1150 /** Internal request (i.e. one generated by the interpreter) is now complete
1151  *
1152  * Whatever generated the request is now responsible for freeing it.
1153  */
1155 {
1156  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1157 
1158  worker_request_time_tracking_end(worker, request, fr_time());
1159 
1160  fr_assert(!fr_heap_entry_inserted(request->runnable_id));
1161  fr_assert(!fr_minmax_heap_entry_inserted(request->time_order_id));
1162  fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1163 }
1164 
1165 /** Detached request (i.e. one generated by the interpreter with no parent) is now complete
1166  *
1167  * As the request has no parent, then there's nothing to free it
1168  * so we have to.
1169  */
1171 {
1172  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1173 
1174  /*
1175  * No time tracking for detached requests
1176  * so we don't need to call
1177  * worker_request_time_tracking_end.
1178  */
1179  fr_assert(!fr_heap_entry_inserted(request->runnable_id));
1180 
1181  /*
1182  * Normally worker_request_time_tracking_end
1183  * would remove the request from the time
1184  * order heap, but we need to do that for
1185  * detached requests.
1186  */
1187  (void)fr_minmax_heap_extract(worker->time_order, request);
1188 
1189  fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1190 
1191  /*
1192  * Detached requests have to be freed by us
1193  * as nothing else can free them.
1194  *
1195  * All other requests must be freed by the
1196  * code which allocated them.
1197  */
1198  talloc_free(request);
1199 }
1200 
1201 
1202 /** Make us responsible for running the request
1203  *
1204  */
1205 static void _worker_request_detach(request_t *request, void *uctx)
1206 {
1207  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1208 
1209  if (request_is_detachable(request)) {
1210  /*
1211  * End the time tracking... We don't track detached requests,
1212  * because they don't contribute for the time consumed by an
1213  * external request.
1214  */
1215  if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) {
1216  RDEBUG3("Forcing time tracking to running state, from yielded, for request detach");
1217  fr_time_tracking_resume(&request->async->tracking, fr_time());
1218  }
1219  worker_request_time_tracking_end(worker, request, fr_time());
1220 
1221  if (request_detach(request) < 0) RPEDEBUG("Failed detaching request");
1222 
1223  RDEBUG3("Request is detached");
1224  } else {
1225  fr_assert_msg(0, "Request is not detachable");
1226  }
1227 
1228  return;
1229 }
1230 
1231 /** This is called by the interpreter when it wants to stop a request
1232  *
1233  * The idea is to get the request into the same state it would be in
1234  * if the interpreter had just finished with it.
1235  */
1236 static void _worker_request_stop(request_t *request, void *uctx)
1237 {
1238  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1239 
1240  RDEBUG3("Cleaning up request execution state");
1241 
1242  /*
1243  * Make sure time tracking is always in a
1244  * consistent state when we mark the request
1245  * as done.
1246  */
1247  if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) {
1248  RDEBUG3("Forcing time tracking to running state, from yielded, for request stop");
1249  fr_time_tracking_resume(&request->async->tracking, fr_time());
1250  }
1251 
1252  /*
1253  * If the request is in the runnable queue
1254  * yank it back out, so it's not "runnable"
1255  * when we call request done.
1256  */
1257  if (fr_heap_entry_inserted(request->runnable_id)) fr_heap_extract(&worker->runnable, request);
1258 }
1259 
1260 /** Request is now runnable
1261  *
1262  */
1263 static void _worker_request_runnable(request_t *request, void *uctx)
1264 {
1265  fr_worker_t *worker = uctx;
1266 
1267  RDEBUG3("Request marked as runnable");
1268  fr_heap_insert(&worker->runnable, request);
1269 }
1270 
1271 /** Interpreter yielded request
1272  *
1273  */
1274 static void _worker_request_yield(request_t *request, UNUSED void *uctx)
1275 {
1276  RDEBUG3("Request yielded");
1277  fr_time_tracking_yield(&request->async->tracking, fr_time());
1278 }
1279 
1280 /** Interpreter is starting to work on request again
1281  *
1282  */
1283 static void _worker_request_resume(request_t *request, UNUSED void *uctx)
1284 {
1285  RDEBUG3("Request resuming");
1286  fr_time_tracking_resume(&request->async->tracking, fr_time());
1287 }
1288 
1289 /** Check if a request is scheduled
1290  *
1291  */
1292 static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
1293 {
1294  return fr_heap_entry_inserted(request->runnable_id);
1295 }
1296 
1297 /** Run a request
1298  *
1299  * Until it either yields, or is done.
1300  *
1301  * This function is also responsible for sending replies, and
1302  * cleaning up the request.
1303  *
1304  * @param[in] worker the worker
1305  * @param[in] start the current time
1306  */
1307 static inline CC_HINT(always_inline) void worker_run_request(fr_worker_t *worker, fr_time_t start)
1308 {
1309  request_t *request;
1310  fr_time_t now;
1311 
1312  WORKER_VERIFY;
1313 
1314  now = start;
1315 
1316  /*
1317  * Busy-loop running requests for 1ms. We still poll the
1318  * event loop 1000 times a second, OR when there's no
1319  * more work to do. This allows us to make progress with
1320  * ongoing requests, at the expense of sometimes ignoring
1321  * new ones.
1322  */
1323  while (fr_time_delta_lt(fr_time_sub(now, start), fr_time_delta_from_msec(1)) &&
1324  ((request = fr_heap_pop(&worker->runnable)) != NULL)) {
1325 
1326  REQUEST_VERIFY(request);
1327  fr_assert(!fr_heap_entry_inserted(request->runnable_id));
1328 
1329  /*
1330  * For real requests, if the channel is gone,
1331  * just stop the request and free it.
1332  */
1333  if (request->async->channel && !fr_channel_active(request->async->channel)) {
1334  worker_stop_request(&request);
1335  return;
1336  }
1337 
1338  (void)unlang_interpret(request);
1339 
1340  now = fr_time();
1341  }
1342 }
1343 
1344 /** Create a worker
1345  *
1346  * @param[in] ctx the talloc context
1347  * @param[in] name the name of this worker
1348  * @param[in] el the event list
1349  * @param[in] logger the destination for all logging messages
1350  * @param[in] lvl log level
1351  * @param[in] config various configuration parameters
1352  * @return
1353  * - NULL on error
1354  * - fr_worker_t on success
1355  */
1356 fr_worker_t *fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl,
1358 {
1359  fr_worker_t *worker;
1360 
1361  worker = talloc_zero(ctx, fr_worker_t);
1362  if (!worker) {
1363 nomem:
1364  fr_strerror_const("Failed allocating memory");
1365  return NULL;
1366  }
1367 
1368  worker->name = talloc_strdup(worker, name); /* thread locality */
1369 
1370  unlang_thread_instantiate(worker);
1371 
1372  if (config) worker->config = *config;
1373 
1374 #define CHECK_CONFIG(_x, _min, _max) do { \
1375  if (!worker->config._x) worker->config._x = _min; \
1376  if (worker->config._x < _min) worker->config._x = _min; \
1377  if (worker->config._x > _max) worker->config._x = _max; \
1378  } while (0)
1379 
1380 #define CHECK_CONFIG_TIME_DELTA(_x, _min, _max) do { \
1381  if (fr_time_delta_lt(worker->config._x, _min)) worker->config._x = _min; \
1382  if (fr_time_delta_gt(worker->config._x, _max)) worker->config._x = _max; \
1383  } while (0)
1384 
1385  CHECK_CONFIG(max_requests,1024,(1 << 30));
1386  CHECK_CONFIG(max_channels, 64, 1024);
1387  CHECK_CONFIG(talloc_pool_size, 4096, 65536);
1388  CHECK_CONFIG(message_set_size, 1024, 8192);
1389  CHECK_CONFIG(ring_buffer_size, (1 << 17), (1 << 20));
1391 
1392  worker->channel = talloc_zero_array(worker, fr_worker_channel_t, worker->config.max_channels);
1393  if (!worker->channel) {
1394  talloc_free(worker);
1395  goto nomem;
1396  }
1397 
1398  worker->thread_id = pthread_self();
1399  worker->el = el;
1400  worker->log = logger;
1401  worker->lvl = lvl;
1402 
1403  /*
1404  * The worker thread starts now. Manually initialize it,
1405  * because we're tracking request time, not the time that
1406  * the worker thread is running.
1407  */
1408  memset(&worker->tracking, 0, sizeof(worker->tracking));
1409 
1410  worker->aq_control = fr_atomic_queue_alloc(worker, 1024);
1411  if (!worker->aq_control) {
1412  fr_strerror_const("Failed creating atomic queue");
1413  fail:
1414  talloc_free(worker);
1415  return NULL;
1416  }
1417 
1418  worker->control = fr_control_create(worker, el, worker->aq_control);
1419  if (!worker->control) {
1420  fr_strerror_const_push("Failed creating control plane");
1421  goto fail;
1422  }
1423 
1425  fr_strerror_const_push("Failed adding control channel");
1426  goto fail;
1427  }
1428 
1430  fr_strerror_const_push("Failed adding callback for listeners");
1431  goto fail;
1432  }
1433 
1434  worker->runnable = fr_heap_talloc_alloc(worker, worker_runnable_cmp, request_t, runnable_id, 0);
1435  if (!worker->runnable) {
1436  fr_strerror_const("Failed creating runnable heap");
1437  goto fail;
1438  }
1439 
1440  worker->time_order = fr_minmax_heap_talloc_alloc(worker, worker_time_order_cmp, request_t, time_order_id, 0);
1441  if (!worker->time_order) {
1442  fr_strerror_const("Failed creating time_order heap");
1443  goto fail;
1444  }
1445 
1446  worker->dedup = fr_rb_inline_talloc_alloc(worker, request_t, dedup_node, worker_dedup_cmp, NULL);
1447  if (!worker->dedup) {
1448  fr_strerror_const("Failed creating de_dup tree");
1449  goto fail;
1450  }
1451 
1453  if (!worker->listeners) {
1454  fr_strerror_const("Failed creating listener tree");
1455  goto fail;
1456  }
1457 
1458  worker->intp = unlang_interpret_init(worker, el,
1460  .init_internal = _worker_request_internal_init,
1461 
1462  .done_external = _worker_request_done_external,
1463  .done_internal = _worker_request_done_internal,
1464  .done_detached = _worker_request_done_detached,
1465 
1466  .detach = _worker_request_detach,
1467  .stop = _worker_request_stop,
1468  .yield = _worker_request_yield,
1469  .resume = _worker_request_resume,
1470  .mark_runnable = _worker_request_runnable,
1471 
1472  .scheduled = _worker_request_scheduled
1473  },
1474  worker);
1475  if (!worker->intp){
1476  fr_strerror_const("Failed initialising interpreter");
1477  goto fail;
1478  }
1480 
1481  return worker;
1482 }
1483 
1484 
1485 /** The main loop and entry point of the stand-alone worker thread.
1486  *
1487  * Where there is only one thread, the event loop runs fr_worker_pre_event() and fr_worker_post_event()
1488  * instead, And then fr_worker_post_event() takes care of calling worker_run_request() to actually run the
1489  * request.
1490  *
1491  * @param[in] worker the worker data structure to manage
1492  */
1493 void fr_worker(fr_worker_t *worker)
1494 {
1495  WORKER_VERIFY;
1496 
1497  while (true) {
1498  bool wait_for_event;
1499  int num_events;
1500 
1501  WORKER_VERIFY;
1502 
1503  /*
1504  * There are runnable requests. We still service
1505  * the event loop, but we don't wait for events.
1506  */
1507  wait_for_event = (fr_heap_num_elements(worker->runnable) == 0);
1508  if (wait_for_event) {
1509  if (worker->exiting && (fr_minmax_heap_num_elements(worker->time_order) == 0)) break;
1510 
1511  DEBUG4("Ready to process requests");
1512  }
1513 
1514  /*
1515  * Check the event list. If there's an error
1516  * (e.g. exit), we stop looping and clean up.
1517  */
1518  DEBUG4("Gathering events - %s", wait_for_event ? "will wait" : "Will not wait");
1519  num_events = fr_event_corral(worker->el, fr_time(), wait_for_event);
1520  if (num_events < 0) {
1521  PERROR("Failed retrieving events");
1522  break;
1523  }
1524 
1525  DEBUG4("%u event(s) pending%s",
1526  num_events == -1 ? 0 : num_events, num_events == -1 ? " - event loop exiting" : "");
1527 
1528  /*
1529  * Service outstanding events.
1530  */
1531  if (num_events > 0) {
1532  DEBUG4("Servicing event(s)");
1533  fr_event_service(worker->el);
1534  }
1535 
1536  /*
1537  * Run any outstanding requests.
1538  */
1539  worker_run_request(worker, fr_time());
1540  }
1541 }
1542 
1543 /** Pre-event handler
1544  *
1545  * This should be run ONLY in single-threaded mode!
1546  */
1548 {
1549  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1550  request_t *request;
1551 
1552  request = fr_heap_peek(worker->runnable);
1553  if (!request) return 0;
1554 
1555  /*
1556  * There's work to do. Tell the event handler to poll
1557  * for IO / timers, but also immediately return to the
1558  * calling function, which has more work to do.
1559  */
1560  return 1;
1561 }
1562 
1563 
1564 /** Post-event handler
1565  *
1566  * This should be run ONLY in single-threaded mode!
1567  */
1569 {
1570  fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1571 
1572  worker_run_request(worker, fr_time()); /* Event loop time can be too old, and trigger asserts */
1573 }
1574 
1575 /** Print debug information about the worker structure
1576  *
1577  * @param[in] worker the worker
1578  * @param[in] fp the file where the debug output is printed.
1579  */
1580 void fr_worker_debug(fr_worker_t *worker, FILE *fp)
1581 {
1582  WORKER_VERIFY;
1583 
1584  fprintf(fp, "\tnum_channels = %d\n", worker->num_channels);
1585  fprintf(fp, "\tstats.in = %" PRIu64 "\n", worker->stats.in);
1586 
1587  fprintf(fp, "\tcalculated (predicted) total CPU time = %" PRIu64 "\n",
1588  fr_time_delta_unwrap(worker->predicted) * worker->stats.in);
1589  fprintf(fp, "\tcalculated (counted) per request time = %" PRIu64 "\n",
1590  fr_time_delta_unwrap(worker->tracking.running_total) / worker->stats.in);
1591 
1592  fr_time_tracking_debug(&worker->tracking, fp);
1593 
1594 }
1595 
1596 /** Create a channel to the worker
1597  *
1598  * Called by the master (i.e. network) thread when it needs to create
1599  * a new channel to a particuler worker.
1600  *
1601  * @param[in] worker the worker
1602  * @param[in] master the control plane of the master
1603  * @param[in] ctx the context in which the channel will be created
1604  */
1606 {
1607  fr_channel_t *ch;
1608  pthread_t id;
1609  bool same;
1610 
1611  WORKER_VERIFY;
1612 
1613  id = pthread_self();
1614  same = (pthread_equal(id, worker->thread_id) != 0);
1615 
1616  ch = fr_channel_create(ctx, master, worker->control, same);
1617  if (!ch) return NULL;
1618 
1620 
1621  /*
1622  * Tell the worker about the channel
1623  */
1624  if (fr_channel_signal_open(ch) < 0) {
1625  talloc_free(ch);
1626  return NULL;
1627  }
1628 
1629  return ch;
1630 }
1631 
1633 {
1635 
1636  /*
1637  * Skip a bunch of work if we're already in the worker thread.
1638  */
1639  if (is_worker_thread(worker)) {
1640  return fr_worker_listen_cancel_self(worker, li);
1641  }
1642 
1643  rb = fr_worker_rb_init();
1644  if (!rb) return -1;
1645 
1646  return fr_control_message_send(worker->control, rb, FR_CONTROL_ID_LISTEN, &li, sizeof(li));
1647 }
1648 
1649 #ifdef WITH_VERIFY_PTR
1650 /** Verify the worker data structures.
1651  *
1652  * @param[in] worker the worker
1653  */
1654 static void worker_verify(fr_worker_t *worker)
1655 {
1656  int i;
1657 
1658  (void) talloc_get_type_abort(worker, fr_worker_t);
1659  fr_atomic_queue_verify(worker->aq_control);
1660 
1661  fr_assert(worker->control != NULL);
1662  (void) talloc_get_type_abort(worker->control, fr_control_t);
1663 
1664  fr_assert(worker->el != NULL);
1665  (void) talloc_get_type_abort(worker->el, fr_event_list_t);
1666 
1667  fr_assert(worker->runnable != NULL);
1668  (void) talloc_get_type_abort(worker->runnable, fr_heap_t);
1669 
1670  fr_assert(worker->dedup != NULL);
1671  (void) talloc_get_type_abort(worker->dedup, fr_rb_tree_t);
1672 
1673  for (i = 0; i < worker->config.max_channels; i++) {
1674  if (!worker->channel[i].ch) continue;
1675 
1676  (void) talloc_get_type_abort(worker->channel[i].ch, fr_channel_t);
1677  }
1678 }
1679 #endif
1680 
1681 int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
1682 {
1683  if (num < 0) return -1;
1684  if (num == 0) return 0;
1685 
1686  stats[0] = worker->stats.in;
1687  if (num >= 2) stats[1] = worker->stats.out;
1688  if (num >= 3) stats[2] = worker->stats.dup;
1689  if (num >= 4) stats[3] = worker->stats.dropped;
1690  if (num >= 5) stats[4] = worker->num_naks;
1691  if (num >= 6) stats[5] = worker->num_active;
1692 
1693  if (num <= 6) return num;
1694 
1695  return 6;
1696 }
1697 
1698 static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
1699 {
1700  fr_worker_t const *worker = ctx;
1701  fr_time_delta_t when;
1702 
1703  if ((info->argc == 0) || (strcmp(info->argv[0], "count") == 0)) {
1704  fprintf(fp, "count.in\t\t\t%" PRIu64 "\n", worker->stats.in);
1705  fprintf(fp, "count.out\t\t\t%" PRIu64 "\n", worker->stats.out);
1706  fprintf(fp, "count.dup\t\t\t%" PRIu64 "\n", worker->stats.dup);
1707  fprintf(fp, "count.dropped\t\t\t%" PRIu64 "\n", worker->stats.dropped);
1708  fprintf(fp, "count.naks\t\t\t%" PRIu64 "\n", worker->num_naks);
1709  fprintf(fp, "count.active\t\t\t%" PRIu64 "\n", worker->num_active);
1710  fprintf(fp, "count.runnable\t\t\t%u\n", fr_heap_num_elements(worker->runnable));
1711  }
1712 
1713  if ((info->argc == 0) || (strcmp(info->argv[0], "cpu") == 0)) {
1714  when = worker->predicted;
1715  fprintf(fp, "cpu.request_time_rtt\t\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1716 
1717  when = worker->tracking.running_total;
1718  if (fr_time_delta_ispos(when)) when = fr_time_delta_div(when, fr_time_delta_wrap(worker->stats.in - worker->stats.dropped));
1719  fprintf(fp, "cpu.average_request_time\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1720 
1721  when = worker->tracking.running_total;
1722  fprintf(fp, "cpu.used\t\t\t%.6f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1723 
1724  when = worker->tracking.waiting_total;
1725  fprintf(fp, "cpu.waiting\t\t\t%.3f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1726 
1727  fr_time_elapsed_fprint(fp, &worker->cpu_time, "cpu.requests", 4);
1728  fr_time_elapsed_fprint(fp, &worker->wall_clock, "time.requests", 4);
1729  }
1730 
1731  return 0;
1732 }
1733 
1735  {
1736  .parent = "stats",
1737  .name = "worker",
1738  .help = "Statistics for workers threads.",
1739  .read_only = true
1740  },
1741 
1742  {
1743  .parent = "stats worker",
1744  .add_name = true,
1745  .name = "self",
1746  .syntax = "[(count|cpu)]",
1747  .func = cmd_stats_worker,
1748  .help = "Show statistics for a specific worker thread.",
1749  .read_only = true
1750  },
1751 
1753 };
static int const char char buffer[256]
Definition: acutest.h:574
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
Definition: app_io.h:55
size_t default_reply_size
same for replies
Definition: app_io.h:40
size_t default_message_size
Usually maximum message size.
Definition: app_io.h:39
fr_io_nak_t nak
Function to send a NAK.
Definition: app_io.h:62
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
Definition: app_io.h:54
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
Definition: application.h:80
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
Definition: application.h:85
#define fr_atexit_thread_local(_name, _free, _uctx)
Definition: atexit.h:221
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Definition: atomic_queue.c:80
Structure to hold the atomic queue.
Definition: atomic_queue.c:54
#define atomic_uint64_t
Definition: atomic_queue.h:42
#define RCSID(id)
Definition: build.h:481
#define NDEBUG_UNUSED
Definition: build.h:324
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition: build.h:110
#define unlikely(_x)
Definition: build.h:379
#define UNUSED
Definition: build.h:313
unlang_action_t unlang_call_push(request_t *request, CONF_SECTION *server_cs, bool top_frame)
Push a call frame onto the stack.
Definition: call.c:147
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
Definition: channel.c:183
fr_table_num_sorted_t const channel_signals[]
Definition: channel.c:153
void * fr_channel_responder_uctx_get(fr_channel_t *ch)
Get responder-specific data from a channel.
Definition: channel.c:897
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition: channel.c:685
int fr_channel_set_recv_request(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_request)
Definition: channel.c:938
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
Definition: channel.c:472
int fr_channel_null_reply(fr_channel_t *ch)
Don't send a reply message into the channel.
Definition: channel.c:624
void fr_channel_responder_uctx_add(fr_channel_t *ch, void *uctx)
Add responder-specific data to a channel.
Definition: channel.c:885
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
Definition: channel.c:511
bool fr_channel_active(fr_channel_t *ch)
Check if a channel is active.
Definition: channel.c:812
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
Definition: channel.c:854
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
Definition: channel.c:952
A full channel, which consists of two ends.
Definition: channel.c:144
fr_message_t m
the message header
Definition: channel.h:105
fr_channel_event_t
Definition: channel.h:67
@ FR_CHANNEL_NOOP
Definition: channel.h:74
@ FR_CHANNEL_EMPTY
Definition: channel.h:75
@ FR_CHANNEL_CLOSE
Definition: channel.h:72
@ FR_CHANNEL_ERROR
Definition: channel.h:68
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition: channel.h:70
@ FR_CHANNEL_OPEN
Definition: channel.h:71
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition: channel.h:69
void * packet_ctx
Packet specific context for holding client information, and other proto_* specific information that n...
Definition: channel.h:142
fr_listen_t * listen
for tracking packet transport, etc.
Definition: channel.h:146
uint32_t priority
Priority of this packet.
Definition: channel.h:140
Channel information which is added to a message.
Definition: channel.h:104
int argc
current argument count
Definition: command.h:39
char const * parent
e.g. "show module"
Definition: command.h:52
#define CMD_TABLE_END
Definition: command.h:62
char const ** argv
text version of commands
Definition: command.h:42
#define FR_CONTROL_ID_LISTEN_DEAD
Definition: control.h:61
#define FR_CONTROL_ID_CHANNEL
Definition: control.h:56
#define FR_CONTROL_ID_LISTEN
Definition: control.h:57
#define FR_CONTROL_MAX_SIZE
Definition: control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition: control.h:50
static fr_ring_buffer_t * rb
Definition: control_test.c:51
fr_dcursor_eval_t void const * uctx
Definition: dcursor.h:546
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:139
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition: debug.h:210
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition: debug.h:156
#define ERROR(fmt,...)
Definition: dhcpclient.c:41
#define DEBUG(fmt,...)
Definition: dhcpclient.c:39
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition: dlist.h:260
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition: dlist.h:672
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition: dlist.h:163
static void fr_dlist_entry_unlink(fr_dlist_t *entry)
Remove an item from the dlist when we don't have access to the head.
Definition: dlist.h:146
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition: dlist.h:939
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition: dlist.h:486
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition: dlist.h:378
static void fr_dlist_entry_init(fr_dlist_t *entry)
Initialise a linked list without metadata.
Definition: dlist.h:138
Head of a doubly linked list.
Definition: dlist.h:51
#define fr_event_timer_at(...)
Definition: event.h:250
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
Definition: heap.c:322
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition: heap.c:146
int fr_heap_extract(fr_heap_t **hp, void *data)
Remove a node from the heap.
Definition: heap.c:239
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition: heap.h:136
static bool fr_heap_entry_inserted(fr_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
Definition: heap.h:124
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition: heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition: heap.h:115
The main heap structure.
Definition: heap.h:66
rlm_rcode_t unlang_interpret(request_t *request)
Run the interpreter for a current request.
Definition: interpret.c:770
void unlang_interpret_set(request_t *request, unlang_interpret_t *intp)
Set a specific interpreter for a request.
Definition: interpret.c:1745
int unlang_interpret_stack_depth(request_t *request)
Return the depth of the request's stack.
Definition: interpret.c:1275
void unlang_interpret_set_thread_default(unlang_interpret_t *intp)
Set the default interpreter for this thread.
Definition: interpret.c:1776
unlang_interpret_t * unlang_interpret_init(TALLOC_CTX *ctx, fr_event_list_t *el, unlang_request_func_t *funcs, void *uctx)
Initialize a unlang compiler / interpret.
Definition: interpret.c:1703
void unlang_interpret_signal(request_t *request, fr_signal_t action)
Send a signal (usually stop) to a request.
Definition: interpret.c:1196
bool unlang_interpret_is_resumable(request_t *request)
Check if a request as resumable.
Definition: interpret.c:1341
#define UNLANG_TOP_FRAME
Definition: interpret.h:35
External functions provided by the owner of the interpret.
Definition: interpret.h:100
uint64_t out
Definition: base.h:43
uint64_t dup
Definition: base.h:44
uint64_t dropped
Definition: base.h:45
uint64_t in
Definition: base.h:42
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition: control.c:149
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition: control.c:417
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition: control.c:343
The control structure.
Definition: control.c:79
void const * app_instance
Definition: listen.h:38
fr_app_t const * app
Definition: listen.h:37
void const * app_io_instance
I/O path configuration context.
Definition: listen.h:32
CONF_SECTION * server_cs
CONF_SECTION of the server.
Definition: listen.h:40
fr_app_io_t const * app_io
I/O path functions.
Definition: listen.h:31
Minimal data structure to use the new code.
Definition: listen.h:58
#define PERROR(_fmt,...)
Definition: log.h:228
#define DEBUG3(_fmt,...)
Definition: log.h:266
#define RDEBUG3(fmt,...)
Definition: log.h:343
#define RWARN(fmt,...)
Definition: log.h:297
#define RERROR(fmt,...)
Definition: log.h:298
#define DEBUG4(_fmt,...)
Definition: log.h:267
#define RPERROR(fmt,...)
Definition: log.h:302
#define RPEDEBUG(fmt,...)
Definition: log.h:376
int fr_event_post_delete(fr_event_list_t *el, fr_event_timer_cb_t callback, void *uctx)
Delete a post-event callback from the event list.
Definition: event.c:2335
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition: event.c:2549
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition: event.c:2414
talloc_free(reap)
Stores all information relating to an event list.
Definition: event.c:411
A timer event.
Definition: event.c:102
fr_log_lvl_t
Definition: log.h:67
fr_packet_t * fr_packet_alloc(TALLOC_CTX *ctx, bool new_vector)
Allocate a new fr_packet_t.
Definition: packet.c:38
unsigned int uint32_t
Definition: merged_model.c:33
long int ssize_t
Definition: merged_model.c:24
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
Definition: message.c:127
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition: message.c:190
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition: message.c:934
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition: message.c:1238
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition: message.c:988
A Message set, composed of message headers and ring buffer data.
Definition: message.c:95
size_t rb_size
cache-aligned size in the ring buffer
Definition: message.h:51
fr_time_t when
when this message was sent
Definition: message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition: message.h:49
size_t data_size
size of the data in the ring buffer
Definition: message.h:50
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
Definition: minmax_heap.c:424
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
Definition: minmax_heap.c:449
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
Definition: minmax_heap.c:466
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
Definition: minmax_heap.c:533
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
Definition: minmax_heap.c:486
static bool fr_minmax_heap_entry_inserted(fr_minmax_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
Definition: minmax_heap.h:93
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
Definition: minmax_heap.h:85
static const conf_parser_t config[]
Definition: base.c:183
static rc_stats_t stats
Definition: radclient-ng.c:74
#define REDEBUG(fmt,...)
Definition: radclient.h:52
#define RDEBUG(fmt,...)
Definition: radclient.h:53
#define DEBUG2(fmt,...)
Definition: radclient.h:43
static void send_reply(int sockfd, fr_channel_data_t *reply)
Definition: radius1_test.c:190
static bool cleanup
Definition: radsniff.c:60
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
Definition: rb.h:246
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
bool fr_rb_delete(fr_rb_tree_t *tree, void const *data)
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
The main red black tree structure.
Definition: rb.h:73
rlm_rcode_t
Return codes indicating the result of the module call.
Definition: rcode.h:40
int request_detach(request_t *child)
Unlink a subrequest from its parent.
Definition: request.c:668
#define REQUEST_VERIFY(_x)
Definition: request.h:276
#define request_is_detached(_x)
Definition: request.h:160
#define request_is_external(_x)
Definition: request.h:158
#define request_is_internal(_x)
Definition: request.h:159
#define request_is_detachable(_x)
Definition: request.h:161
#define request_alloc_external(_ctx, _args)
Allocate a new external request.
Definition: request.h:295
@ REQUEST_STOP_PROCESSING
Request has been signalled to stop.
Definition: request.h:62
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition: ring_buffer.c:64
static char const * name
return count
Definition: module.c:163
fr_assert(0)
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition: state_test.c:8
@ memory_order_seq_cst
Definition: stdatomic.h:132
#define atomic_fetch_add_explicit(object, operand, order)
Definition: stdatomic.h:302
Definition: log.h:96
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition: table.h:772
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition: talloc.h:224
void fr_time_elapsed_update(fr_time_elapsed_t *elapsed, fr_time_t start, fr_time_t end)
Definition: time.c:580
void fr_time_elapsed_fprint(FILE *fp, fr_time_elapsed_t const *elapsed, char const *prefix, int tab_offset)
Definition: time.c:625
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition: time.h:575
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
Definition: time.h:154
#define fr_time_delta_lt(_a, _b)
Definition: time.h:285
static fr_time_delta_t fr_time_delta_from_sec(int64_t sec)
Definition: time.h:590
#define fr_time_delta_wrap(_time)
Definition: time.h:152
#define fr_time_delta_ispos(_a)
Definition: time.h:290
#define fr_time_eq(_a, _b)
Definition: time.h:241
#define NSEC
Definition: time.h:379
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition: time.h:196
#define fr_time_gt(_a, _b)
Definition: time.h:237
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition: time.h:229
static fr_time_delta_t fr_time_delta_div(fr_time_delta_t a, fr_time_delta_t b)
Definition: time.h:267
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition: time.h:916
A time delta, a difference in time measured in nanoseconds.
Definition: time.h:80
"server local" time.
Definition: time.h:69
@ FR_TIME_TRACKING_YIELDED
We're currently tracking time in the yielded state.
Definition: time_tracking.h:44
static void fr_time_tracking_yield(fr_time_tracking_t *tt, fr_time_t now)
Transition to the yielded state, recording the time we just spent running.
static void fr_time_tracking_end(fr_time_delta_t *predicted, fr_time_tracking_t *tt, fr_time_t now)
End time tracking for this entity.
fr_time_delta_t waiting_total
total time spent waiting
Definition: time_tracking.h:80
fr_time_delta_t running_total
total time spent running
Definition: time_tracking.h:79
static void fr_time_tracking_start(fr_time_tracking_t *parent, fr_time_tracking_t *tt, fr_time_t now)
Start time tracking for a tracked entity.
static void fr_time_tracking_resume(fr_time_tracking_t *tt, fr_time_t now)
Track that a request resumed.
static void fr_time_tracking_debug(fr_time_tracking_t *tt, FILE *fp)
Print debug information about the time tracking structure.
static fr_event_list_t * el
int unlang_thread_instantiate(TALLOC_CTX *ctx)
Create thread-specific data structures for unlang.
Definition: compile.c:5122
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
Definition: strerror.c:733
#define fr_strerror_const_push(_msg)
Definition: strerror.h:227
#define fr_strerror_const(_msg)
Definition: strerror.h:223
static fr_slen_t data
Definition: value.h:1265
#define fr_box_time_delta(_val)
Definition: value.h:343
static int8_t worker_time_order_cmp(void const *one, void const *two)
Track a request_t in the "time_order" heap.
Definition: worker.c:979
static char * itoa_internal(TALLOC_CTX *ctx, uint64_t number)
Definition: worker.c:754
fr_heap_t * runnable
current runnable requests which we've spent time processing
Definition: worker.c:113
static void worker_request_time_tracking_end(fr_worker_t *worker, request_t *request, fr_time_t now)
Definition: worker.c:612
static void _worker_request_yield(request_t *request, UNUSED void *uctx)
Interpreter yielded request.
Definition: worker.c:1274
static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Detached request (i.e.
Definition: worker.c:1170
static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
Handle a control plane message sent to the worker via a channel.
Definition: worker.c:248
fr_event_list_t * el
our event list
Definition: worker.c:109
int fr_worker_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
Pre-event handler.
Definition: worker.c:1547
static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now)
Send a response packet to the network side.
Definition: worker.c:629
fr_rb_tree_t * listeners
so we can cancel requests when a listener goes away
Definition: worker.c:117
fr_channel_t * ch
Definition: worker.c:82
static void worker_run_request(fr_worker_t *worker, fr_time_t start)
Run a request.
Definition: worker.c:1307
static void worker_exit(fr_worker_t *worker)
Definition: worker.c:226
#define WORKER_VERIFY
Definition: worker.c:73
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
Definition: worker.c:1605
bool was_sleeping
used to suppress multiple sleep signals in a row
Definition: worker.c:129
static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
Definition: worker.c:1698
static void _worker_request_runnable(request_t *request, void *uctx)
Request is now runnable.
Definition: worker.c:1263
static int8_t worker_dedup_cmp(void const *one, void const *two)
Track a request_t in the "dedup" tree.
Definition: worker.c:989
static void worker_stop_request(request_t **request_p)
Signal the unlang interpreter that it needs to stop running the request.
Definition: worker.c:507
fr_worker_channel_t * channel
list of channels
Definition: worker.c:136
char const * name
name of this worker
Definition: worker.c:95
uint64_t num_active
number of active requests
Definition: worker.c:124
fr_event_timer_t const * ev_cleanup
timer for max_request_time
Definition: worker.c:134
fr_cmd_table_t cmd_worker_table[]
Definition: worker.c:1734
static _Thread_local fr_ring_buffer_t * fr_worker_rb
Definition: worker.c:79
fr_minmax_heap_t * time_order
time ordered heap of requests
Definition: worker.c:114
int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
Definition: worker.c:1681
static void worker_request_time_tracking_start(fr_worker_t *worker, request_t *request, fr_time_t now)
Start time tracking for a request, and mark it as runnable.
Definition: worker.c:587
fr_dlist_head_t dlist
Definition: worker.c:88
static void _worker_request_resume(request_t *request, UNUSED void *uctx)
Interpreter is starting to work on request again.
Definition: worker.c:1283
fr_rb_tree_t * dedup
de-dup tree
Definition: worker.c:115
fr_atomic_queue_t * aq_control
atomic queue for control messages sent to me
Definition: worker.c:105
static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Send a NAK to the network thread.
Definition: worker.c:414
static void worker_request_name_number(request_t *request)
Definition: worker.c:794
static void _worker_request_stop(request_t *request, void *uctx)
This is called by the interpreter when it wants to stop a request.
Definition: worker.c:1236
static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Definition: worker.c:801
fr_log_t const * log
log destination
Definition: worker.c:102
fr_io_stats_t stats
input / output stats
Definition: worker.c:119
#define CHECK_CONFIG(_x, _min, _max)
static void _worker_request_detach(request_t *request, void *uctx)
Make us responsible for running the request.
Definition: worker.c:1205
static int _fr_worker_rb_free(void *arg)
Definition: worker.c:164
fr_time_tracking_t tracking
how much time the worker has spent doing things.
Definition: worker.c:127
static fr_ring_buffer_t * fr_worker_rb_init(void)
Initialise thread local storage.
Definition: worker.c:173
static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
External request is now complete.
Definition: worker.c:1084
void fr_worker_destroy(fr_worker_t *worker)
Destroy a worker.
Definition: worker.c:1012
uint64_t num_naks
number of messages which were nak'd
Definition: worker.c:123
static void worker_request_init(fr_worker_t *worker, request_t *request, fr_time_t now)
Initialize various request fields needed by the worker.
Definition: worker.c:777
fr_worker_config_t config
external configuration
Definition: worker.c:96
fr_listen_t const * listener
incoming packets
Definition: worker.c:140
unlang_interpret_t * intp
Worker's local interpreter.
Definition: worker.c:98
fr_time_t checked_timeout
when we last checked the tails of the queues
Definition: worker.c:132
static int fr_worker_listen_cancel_self(fr_worker_t *worker, fr_listen_t const *li)
Definition: worker.c:365
bool exiting
are we exiting?
Definition: worker.c:130
fr_log_lvl_t lvl
log level
Definition: worker.c:103
static void worker_max_request_timer(fr_worker_t *worker)
See when we next need to service the time_order heap for "too old" packets.
Definition: worker.c:564
static void worker_requests_cancel(fr_worker_channel_t *ch)
Definition: worker.c:217
int num_channels
actual number of channels
Definition: worker.c:111
fr_worker_t * fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config)
Create a worker.
Definition: worker.c:1356
static atomic_uint64_t request_number
Definition: worker.c:77
fr_time_elapsed_t cpu_time
histogram of total CPU time per request
Definition: worker.c:120
fr_rb_node_t node
in tree of listeners
Definition: worker.c:142
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
Definition: worker.c:1632
void fr_worker_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Post-event handler.
Definition: worker.c:1568
fr_dlist_head_t dlist
of requests associated with this listener.
Definition: worker.c:148
void fr_worker(fr_worker_t *worker)
The main loop and entry point of the stand-alone worker thread.
Definition: worker.c:1493
fr_time_delta_t predicted
How long we predict a request will take to execute.
Definition: worker.c:126
pthread_t thread_id
my thread ID
Definition: worker.c:100
static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Callback which handles a message being received on the worker side.
Definition: worker.c:207
fr_time_elapsed_t wall_clock
histogram of wall clock time per request
Definition: worker.c:121
static int8_t worker_listener_cmp(void const *one, void const *two)
Definition: worker.c:152
static void worker_max_request_time(UNUSED fr_event_list_t *el, UNUSED fr_time_t when, void *uctx)
Enforce max_request_time.
Definition: worker.c:527
static bool is_worker_thread(fr_worker_t const *worker)
Definition: worker.c:191
fr_control_t * control
the control plane
Definition: worker.c:107
static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
Check if a request is scheduled.
Definition: worker.c:1292
static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Internal request (i.e.
Definition: worker.c:1154
void fr_worker_debug(fr_worker_t *worker, FILE *fp)
Print debug information about the worker structure.
Definition: worker.c:1580
static void _worker_request_internal_init(request_t *request, void *uctx)
Internal request (i.e.
Definition: worker.c:1065
#define CACHE_LINE_SIZE
Definition: worker.c:76
#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max)
static int8_t worker_runnable_cmp(void const *one, void const *two)
Track a request_t in the "runnable" heap.
Definition: worker.c:962
static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
A socket is going away, so clean up any requests which use this socket.
Definition: worker.c:392
A worker which takes packets from a master, and processes them.
Definition: worker.c:94
int message_set_size
default start number of messages
Definition: worker.h:63
int max_requests
max requests this worker will handle
Definition: worker.h:59
int max_channels
maximum number of channels
Definition: worker.h:61
int ring_buffer_size
default start size for the ring buffers
Definition: worker.h:64
fr_time_delta_t max_request_time
maximum time a request can be processed
Definition: worker.h:66