The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
worker.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 /**
18 * $Id: dc7b30f411ff6e99f3bca2dc1992f8b79969e50a $
19 *
20 * @brief Worker thread functions.
21 * @file io/worker.c
22 *
23 * The "worker" thread is the one responsible for the bulk of the
24 * work done when processing a request. Workers are spawned by the
25 * scheduler, and create a kqueue (KQ) and control-plane
26 * Atomic Queue (AQ) for control-plane communication.
27 *
28 * When a network thread discovers that it needs more workers, it
29 * asks the scheduler for a KQ/AQ combination. The network thread
30 * then creates a channel dedicated to that worker, and sends the
31 * channel to the worker in a "new channel" message. The worker
32 * receives the channel, and sends an ACK back to the network thread.
33 *
34 * The network thread then sends the worker new packets, which the
35 * worker receives and processes.
36 *
37 * When a packet is decoded, it is put into the "runnable" heap, and
38 * also into the timeout sublist. The main loop fr_worker() then
39 * pulls new requests off of this heap and runs them. The main event
40 * loop checks the head of the timeout sublist, and forcefully terminates
41 * any requests which have been running for too long.
42 *
43 * If a request is yielded, it is placed onto the yielded list in
44 * the worker "tracking" data structure.
45 *
46 * @copyright 2016 Alan DeKok (aland@freeradius.org)
47 */
48
49RCSID("$Id: dc7b30f411ff6e99f3bca2dc1992f8b79969e50a $")
50
51#define LOG_PREFIX worker->name
52#define LOG_DST worker->log
53
54#include <freeradius-devel/io/channel.h>
55#include <freeradius-devel/io/listen.h>
56#include <freeradius-devel/io/message.h>
57#include <freeradius-devel/io/worker.h>
58#include <freeradius-devel/unlang/base.h>
59#include <freeradius-devel/unlang/call.h>
60#include <freeradius-devel/unlang/interpret.h>
61#include <freeradius-devel/server/request.h>
62#include <freeradius-devel/server/time_tracking.h>
63#include <freeradius-devel/util/dlist.h>
64#include <freeradius-devel/util/minmax_heap.h>
65#include <freeradius-devel/util/slab.h>
66#include <freeradius-devel/util/time.h>
67#include <freeradius-devel/util/timer.h>
68
69#include <stdalign.h>
70
71#ifdef WITH_VERIFY_PTR
72static void worker_verify(fr_worker_t *worker);
73#define WORKER_VERIFY worker_verify(worker)
74#else
75#define WORKER_VERIFY
76#endif
77
78#define CACHE_LINE_SIZE 64
80
83
84static _Thread_local fr_ring_buffer_t *fr_worker_rb;
85
86typedef struct {
88
89 /*
90 * To save time, we don't care about num_elements here. Which means that we don't
91 * need to cache or lookup the fr_worker_listen_t when we free a request.
92 */
95
96/**
97 * A worker which takes packets from a master, and processes them.
98 */
100 char const *name; //!< name of this worker
101 fr_worker_config_t config; //!< external configuration
102
103 unlang_interpret_t *intp; //!< Worker's local interpreter.
104
105 pthread_t thread_id; //!< my thread ID
106
107 fr_log_t const *log; //!< log destination
108 fr_log_lvl_t lvl; //!< log level
109
110 fr_atomic_queue_t *aq_control; //!< atomic queue for control messages sent to me
111
112 fr_control_t *control; //!< the control plane
113
114 fr_event_list_t *el; //!< our event list
115
116 int num_channels; //!< actual number of channels
117
118 fr_heap_t *runnable; //!< current runnable requests which we've spent time processing
119
120 fr_timer_list_t *timeout; //!< Track when requests timeout using a dlist.
121 fr_timer_list_t *timeout_custom; //!< Track when requests timeout using an lst.
122 ///< requests must always be in one of these lists.
123 fr_time_delta_t max_request_time; //!< maximum time a request can be processed
124
125 fr_rb_tree_t *dedup; //!< de-dup tree
126
127 fr_rb_tree_t *listeners; //!< so we can cancel requests when a listener goes away
128
129 fr_io_stats_t stats; //!< input / output stats
130 fr_time_elapsed_t cpu_time; //!< histogram of total CPU time per request
131 fr_time_elapsed_t wall_clock; //!< histogram of wall clock time per request
132
133 uint64_t num_naks; //!< number of messages which were nak'd
134 uint64_t num_active; //!< number of active requests
135
136 fr_time_delta_t predicted; //!< How long we predict a request will take to execute.
137 fr_time_tracking_t tracking; //!< how much time the worker has spent doing things.
138
139 bool was_sleeping; //!< used to suppress multiple sleep signals in a row
140 bool exiting; //!< are we exiting?
141
142 fr_worker_channel_t *channel; //!< list of channels
143
144 request_slab_list_t *slab; //!< slab allocator for request_t
145};
146
147typedef struct {
148 fr_listen_t const *listener; //!< incoming packets
149
150 fr_rb_node_t node; //!< in tree of listeners
151
152 /*
153 * To save time, we don't care about num_elements here. Which means that we don't
154 * need to cache or lookup the fr_worker_listen_t when we free a request.
155 */
156 fr_dlist_head_t dlist; //!< of requests associated with this listener.
158
159
160static int8_t worker_listener_cmp(void const *one, void const *two)
161{
162 fr_worker_listen_t const *a = one, *b = two;
163
164 return CMP(a->listener, b->listener);
165}
166
167
168/*
169 * Explicitly cleanup the memory allocated to the ring buffer,
170 * just in case valgrind complains about it.
171 */
172static int _fr_worker_rb_free(void *arg)
173{
174 return talloc_free(arg);
175}
176
177/** Initialise thread local storage
178 *
179 * @return fr_ring_buffer_t for messages
180 */
182{
184
186 if (rb) return rb;
187
189 if (!rb) {
190 fr_perror("Failed allocating memory for worker ring buffer");
191 return NULL;
192 }
193
195
196 return rb;
197}
198
199static inline bool is_worker_thread(fr_worker_t const *worker)
200{
201 return (pthread_equal(pthread_self(), worker->thread_id) != 0);
202}
203
205static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now);
206
207/** Callback which handles a message being received on the worker side.
208 *
209 * @param[in] ctx the worker
210 * @param[in] ch the channel to drain
211 * @param[in] cd the message (if any) to start with
212 */
213static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
214{
215 fr_worker_t *worker = ctx;
216
217 worker->stats.in++;
218 DEBUG3("Received request %" PRIu64 "", worker->stats.in);
219 cd->channel.ch = ch;
220 worker_request_bootstrap(worker, cd, fr_time());
221}
222
224{
225 request_t *request;
226
227 while ((request = fr_dlist_pop_head(&ch->dlist)) != NULL) {
229 }
230}
231
232static void worker_exit(fr_worker_t *worker)
233{
234 worker->exiting = true;
235
236 /*
237 * Don't allow the post event to run
238 * any more requests. They'll be
239 * signalled to stop before we exit.
240 *
241 * This only has an effect in single
242 * threaded mode.
243 */
244 (void)fr_event_post_delete(worker->el, fr_worker_post_event, worker);
245}
246
247/** Handle a control plane message sent to the worker via a channel
248 *
249 * @param[in] ctx the worker
250 * @param[in] data the message
251 * @param[in] data_size size of the data
252 * @param[in] now the current time
253 */
254static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
255{
256 int i;
257 bool ok, was_sleeping;
258 fr_channel_t *ch;
261 fr_worker_t *worker = ctx;
262
263 was_sleeping = worker->was_sleeping;
264 worker->was_sleeping = false;
265
266 /*
267 * We were woken up by a signal to do something. We're
268 * not sleeping.
269 */
270 ce = fr_channel_service_message(now, &ch, data, data_size);
271 DEBUG3("Channel %s",
272 fr_table_str_by_value(channel_signals, ce, "<INVALID>"));
273 switch (ce) {
274 case FR_CHANNEL_ERROR:
275 return;
276
277 case FR_CHANNEL_EMPTY:
278 return;
279
280 case FR_CHANNEL_NOOP:
281 return;
282
284 fr_assert(0 == 1);
285 break;
286
288 fr_assert(ch != NULL);
289
290 if (!fr_channel_recv_request(ch)) {
291 worker->was_sleeping = was_sleeping;
292
293 } else while (fr_channel_recv_request(ch));
294 break;
295
296 case FR_CHANNEL_OPEN:
297 fr_assert(ch != NULL);
298
299 ok = false;
300 for (i = 0; i < worker->config.max_channels; i++) {
301 fr_assert(worker->channel[i].ch != ch);
302
303 if (worker->channel[i].ch != NULL) continue;
304
305 worker->channel[i].ch = ch;
306 fr_dlist_init(&worker->channel[i].dlist, fr_async_t, entry);
307
308 DEBUG3("Received channel %p into array entry %d", ch, i);
309
310 ms = fr_message_set_create(worker, worker->config.message_set_size,
311 sizeof(fr_channel_data_t),
312 worker->config.ring_buffer_size);
313 fr_assert(ms != NULL);
315
316 worker->num_channels++;
317 ok = true;
318 break;
319 }
320
321 fr_cond_assert(ok);
322 break;
323
324 case FR_CHANNEL_CLOSE:
325 fr_assert(ch != NULL);
326
327 ok = false;
328
329 /*
330 * Locate the signalling channel in the list
331 * of channels.
332 */
333 for (i = 0; i < worker->config.max_channels; i++) {
334 if (!worker->channel[i].ch) continue;
335
336 if (worker->channel[i].ch != ch) continue;
337
338 worker_requests_cancel(&worker->channel[i]);
339
341
343 "Network added messages to channel after sending FR_CHANNEL_CLOSE");
344
346 fr_assert(ms != NULL);
348 talloc_free(ms);
349
350 worker->channel[i].ch = NULL;
351
352 fr_assert(!fr_dlist_head(&worker->channel[i].dlist)); /* we can't look at num_elements */
353 fr_assert(worker->num_channels > 0);
354
355 worker->num_channels--;
356 ok = true;
357 break;
358 }
359
360 fr_cond_assert(ok);
361
362 /*
363 * Our last input channel closed,
364 * time to die.
365 */
366 if (worker->num_channels == 0) worker_exit(worker);
367 break;
368 }
369}
370
372{
374 request_t *request;
375
376 wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = li });
377 if (!wl) return -1;
378
379 while ((request = fr_dlist_pop_head(&wl->dlist)) != NULL) {
380 RDEBUG("Canceling request due to socket being closed");
382 }
383
384 (void) fr_rb_delete(worker->listeners, wl);
385 talloc_free(wl);
386
387 return 0;
388}
389
390
391/** A socket is going away, so clean up any requests which use this socket.
392 *
393 * @param[in] ctx the worker
394 * @param[in] data the message
395 * @param[in] data_size size of the data
396 * @param[in] now the current time
397 */
398static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
399{
400 fr_listen_t const *li;
401 fr_worker_t *worker = ctx;
402
403 fr_assert(data_size == sizeof(li));
404
405 memcpy(&li, data, sizeof(li));
406
407 (void) fr_worker_listen_cancel_self(worker, li);
408}
409
410/** Send a NAK to the network thread
411 *
412 * The network thread believes that a worker is running a request until that request has been NAK'd.
413 * We typically NAK requests when they've been hanging around in the worker's backlog too long,
414 * or there was an error executing the request.
415 *
416 * @param[in] worker the worker
417 * @param[in] cd the message to NAK
418 * @param[in] now when the message is NAKd
419 */
420static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
421{
422 size_t size;
423 fr_channel_data_t *reply;
424 fr_channel_t *ch;
426 fr_listen_t *listen;
427
428 worker->num_naks++;
429
430 /*
431 * Cache the outbound channel. We'll need it later.
432 */
433 ch = cd->channel.ch;
434 listen = cd->listen;
435
436 /*
437 * If the channel has been closed, but we haven't
438 * been informed, that is extremely bad.
439 *
440 * Try to continue working... but we'll likely
441 * leak memory or SEGV soon.
442 */
443 if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send NAK but channel has been closed")) {
444 fr_message_done(&cd->m);
445 return;
446 }
447
449 fr_assert(ms != NULL);
450
451 size = listen->app_io->default_reply_size;
452 if (!size) size = listen->app_io->default_message_size;
453
454 /*
455 * Allocate a default message size.
456 */
457 reply = (fr_channel_data_t *) fr_message_reserve(ms, size);
458 fr_assert(reply != NULL);
459
460 /*
461 * Encode a NAK
462 */
463 if (listen->app_io->nak) {
464 size = listen->app_io->nak(listen, cd->packet_ctx, cd->m.data,
465 cd->m.data_size, reply->m.data, reply->m.rb_size);
466 } else {
467 size = 1; /* rely on them to figure it the heck out */
468 }
469
470 (void) fr_message_alloc(ms, &reply->m, size);
471
472 /*
473 * Fill in the NAK.
474 */
475 reply->m.when = now;
476 reply->reply.cpu_time = worker->tracking.running_total;
477 reply->reply.processing_time = fr_time_delta_from_sec(10); /* @todo - set to something better? */
478 reply->reply.request_time = cd->request.recv_time;
479
480 reply->listen = cd->listen;
481 reply->packet_ctx = cd->packet_ctx;
482
483 /*
484 * Mark the original message as done.
485 */
486 fr_message_done(&cd->m);
487
488 /*
489 * Send the reply, which also polls the request queue.
490 */
491 if (fr_channel_send_reply(ch, reply) < 0) {
492 DEBUG2("Failed sending reply to channel");
493 }
494
495 worker->stats.out++;
496}
497
498/** Signal the unlang interpreter that it needs to stop running the request
499 *
500 * Signalling is a synchronous operation. Whatever I/O requests the request
501 * is currently performing are immediately cancelled, and all the frames are
502 * popped off the unlang stack.
503 *
504 * Modules and unlang keywords explicitly register signal handlers to deal
505 * with their yield points being cancelled/interrupted via this function.
506 *
507 * The caller should assume the request is no longer viable after calling
508 * this function.
509 *
510 * @param[in] request_p Pointer to the request to cancel.
511 * Will be set to NULL.
512 */
513static void worker_stop_request(request_t **request_p)
514{
515 /*
516 * Also marks the request as done and runs
517 * the internal/external callbacs.
518 */
520 *request_p = NULL;
521}
522
523/** Enforce max_request_time
524 *
525 * Run periodically, and tries to clean up requests which were received by the network
526 * thread more than max_request_time seconds ago. In the interest of not adding a
527 * timer for every packet, the requests are given a 1 second leeway.
528 *
529 * @param[in] tl the worker's timer list.
530 * @param[in] when the current time
531 * @param[in] uctx the request_t timing out.
532 */
534{
535 request_t *request = talloc_get_type_abort(uctx, request_t);
536
537 /*
538 * Waiting too long, delete it.
539 */
540 REDEBUG("Request has reached max_request_time - signalling it to stop");
541 worker_stop_request(&request);
542}
543
544/** Set, or re-set the request timer
545 *
546 * Automatically moves requests between the timer lists (timeout, custom_timeout).
547 *
548 * Can be used to set the initial timeout, or extend the timeout of a request.
549 *
550 * @param[in] worker the worker containing the timeout lists.
551 * @param[in] request that we're timing out.
552 * @param[in] timeout the timeout to set.
553 * @return
554 * - 0 on success.
555 * - -1 on failure.
556 */
558{
559 fr_timer_list_t *tl = fr_time_delta_eq(worker->config.max_request_time, timeout) ? worker->timeout : worker->timeout_custom;
560
561 /* No need to disarm fr_timer_in does it for us */
562
563 if (unlikely(fr_timer_in(request, tl, &request->timeout, timeout,
564 true, _worker_request_timeout, request) < 0)) {
565 RERROR("Failed to create request timeout timer");
566 return -1;
567 }
568
569 return 0;
570}
571
572/** Start time tracking for a request, and mark it as runnable.
573 *
574 */
576{
577 /*
578 * New requests are inserted into the time order heap in
579 * strict time priority. Once they are in the list, they
580 * are only removed when the request is done / free'd.
581 */
582 fr_assert(!fr_timer_armed(request->timeout));
583
584 if (unlikely(fr_worker_request_timeout_set(worker, request, worker->config.max_request_time) < 0)) {
585 RERROR("Failed to set request timeout");
586 return -1;
587 }
588
589 /*
590 * Bootstrap the async state machine with the initial
591 * state of the request.
592 */
593 RDEBUG3("Time tracking started in yielded state");
594 fr_time_tracking_start(&worker->tracking, &request->async->tracking, now);
595 fr_time_tracking_yield(&request->async->tracking, now);
596 worker->num_active++;
597
598 fr_assert(!fr_heap_entry_inserted(request->runnable));
599 (void) fr_heap_insert(&worker->runnable, request);
600
601 return 0;
602}
603
605{
606 RDEBUG3("Time tracking ended");
607 fr_time_tracking_end(&worker->predicted, &request->async->tracking, now);
608 fr_assert(worker->num_active > 0);
609 worker->num_active--;
610
611 TALLOC_FREE(request->timeout); /* Disarm the reques timer */
612}
613
614/** Send a response packet to the network side
615 *
616 * @param[in] worker This worker.
617 * @param[in] request we're sending a reply for.
618 * @param[in] send_reply whether the network side sends a reply
619 * @param[in] now The current time
620 */
621static void worker_send_reply(fr_worker_t *worker, request_t *request, bool send_reply, fr_time_t now)
622{
623 fr_channel_data_t *reply;
624 fr_channel_t *ch;
626 size_t size = 1;
627
628 REQUEST_VERIFY(request);
629
630 /*
631 * If we're sending a reply, then it's no longer runnable.
632 */
633 fr_assert(!fr_heap_entry_inserted(request->runnable));
634
635 if (send_reply) {
636 size = request->async->listen->app_io->default_reply_size;
637 if (!size) size = request->async->listen->app_io->default_message_size;
638 }
639
640 /*
641 * Allocate and send the reply.
642 */
643 ch = request->async->channel;
644 fr_assert(ch != NULL);
645
646 /*
647 * If the channel has been closed, but we haven't
648 * been informed, that is extremely bad.
649 *
650 * Try to continue working... but we'll likely
651 * leak memory or SEGV soon.
652 */
653 if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send reply but channel has been closed")) {
654 return;
655 }
656
658 fr_assert(ms != NULL);
659
660 reply = (fr_channel_data_t *) fr_message_reserve(ms, size);
661 fr_assert(reply != NULL);
662
663 /*
664 * Encode it, if required.
665 */
666 if (send_reply) {
667 ssize_t slen = 0;
668 fr_listen_t const *listen = request->async->listen;
669
670 if (listen->app_io->encode) {
671 slen = listen->app_io->encode(listen->app_io_instance, request,
672 reply->m.data, reply->m.rb_size);
673 } else if (listen->app->encode) {
674 slen = listen->app->encode(listen->app_instance, request,
675 reply->m.data, reply->m.rb_size);
676 }
677 if (slen < 0) {
678 RPERROR("Failed encoding request");
679 *reply->m.data = 0;
680 slen = 1;
681 }
682
683 /*
684 * Shrink the buffer to the actual packet size.
685 *
686 * This will ALWAYS return the same message as we put in.
687 */
688 fr_assert((size_t) slen <= reply->m.rb_size);
689 (void) fr_message_alloc(ms, &reply->m, slen);
690 }
691
692 /*
693 * Fill in the rest of the fields in the channel message.
694 *
695 * sequence / ack will be filled in by fr_channel_send_reply()
696 */
697 reply->m.when = now;
698 reply->reply.cpu_time = worker->tracking.running_total;
699 reply->reply.processing_time = request->async->tracking.running_total;
700 reply->reply.request_time = request->async->recv_time;
701
702 reply->listen = request->async->listen;
703 reply->packet_ctx = request->async->packet_ctx;
704
705 /*
706 * Update the various timers.
707 */
708 fr_time_elapsed_update(&worker->cpu_time, now, fr_time_add(now, reply->reply.processing_time));
709 fr_time_elapsed_update(&worker->wall_clock, reply->reply.request_time, now);
710
711 RDEBUG("Finished request");
712
713 /*
714 * Send the reply, which also polls the request queue.
715 */
716 if (fr_channel_send_reply(ch, reply) < 0) {
717 /*
718 * Should only happen if the TO_REQUESTOR
719 * channel is full, or it's not yet active.
720 *
721 * Not much we can do except complain
722 * loudly and cleanup the request.
723 */
724 RPERROR("Failed sending reply to network thread");
725 }
726
727 worker->stats.out++;
728
729 fr_assert(!fr_timer_armed(request->timeout));
730 fr_assert(!fr_heap_entry_inserted(request->runnable));
731
732 fr_dlist_entry_unlink(&request->listen_entry);
733
734#ifndef NDEBUG
735 request->async->el = NULL;
736 request->async->channel = NULL;
737 request->async->packet_ctx = NULL;
738 request->async->listen = NULL;
739#endif
740}
741
742/*
743 * talloc_typed_asprintf() is horrifically slow for printing
744 * simple numbers.
745 */
746static char *itoa_internal(TALLOC_CTX *ctx, uint64_t number)
747{
748 char buffer[32];
749 char *p;
750 char const *numbers = "0123456789";
751
752 p = buffer + 30;
753 *(p--) = '\0';
754
755 while (number > 0) {
756 *(p--) = numbers[number % 10];
757 number /= 10;
758 }
759
760 if (p[1]) return talloc_strdup(ctx, p + 1);
761
762 return talloc_strdup(ctx, "0");
763}
764
765/** Initialize various request fields needed by the worker.
766 *
767 */
768static inline CC_HINT(always_inline)
770{
771 /*
772 * For internal requests request->packet
773 * and request->reply are already populated.
774 */
775 if (!request->packet) MEM(request->packet = fr_packet_alloc(request, false));
776 if (!request->reply) MEM(request->reply = fr_packet_alloc(request, false));
777
778 request->packet->timestamp = now;
779 request->async = talloc_zero(request, fr_async_t);
780 request->async->recv_time = now;
781 request->async->el = worker->el;
782 fr_dlist_entry_init(&request->async->entry);
783}
784
785static inline CC_HINT(always_inline)
787{
789 if (request->name) talloc_const_free(request->name);
790 request->name = itoa_internal(request, request->number);
791}
792
793static inline CC_HINT(always_inline)
795{
796 return fr_timer_list_num_events(worker->timeout) + fr_timer_list_num_events(worker->timeout_custom);
797}
798
799static int _worker_request_deinit(request_t *request, UNUSED void *uctx)
800{
801 return request_slab_deinit(request);
802}
803
805{
806 int ret = -1;
807 request_t *request;
808 TALLOC_CTX *ctx;
809 fr_listen_t *listen = cd->listen;
810
811 if (worker_num_requests(worker) >= (uint32_t) worker->config.max_requests) {
812 RATE_LIMIT_GLOBAL(ERROR, "Worker at max requests");
813 goto nak;
814 }
815
816 /*
817 * Receive a message to the worker queue, and decode it
818 * to a request.
819 */
820 fr_assert(listen != NULL);
821
822 ctx = request = request_slab_reserve(worker->slab);
823 if (!request) {
824 RATE_LIMIT_GLOBAL(ERROR, "Worker failed allocating new request");
825 goto nak;
826 }
827 /*
828 * Ensures that both the deinit function runs AND
829 * the request is returned to the slab if something
830 * calls talloc_free() on it.
831 */
832 request_slab_element_set_destructor(request, _worker_request_deinit, worker);
833
834 /*
835 * Have to initialise the request manually because namspace
836 * changes based on the listener that allocated it.
837 */
838 if (request_init(request, REQUEST_TYPE_EXTERNAL, (&(request_init_args_t){ .namespace = listen->dict })) < 0) {
839 request_slab_release(request);
840 goto nak;
841 }
842
843 /*
844 * Do normal worker init that's shared between internal
845 * and external requests.
846 */
847 worker_request_init(worker, request, now);
849
850 /*
851 * Associate our interpreter with the request
852 */
853 unlang_interpret_set(request, worker->intp);
854
855 request->packet->timestamp = cd->request.recv_time; /* Legacy - Remove once everything looks at request->async */
856
857 /*
858 * Update the transport-specific fields.
859 */
860 request->async->channel = cd->channel.ch;
861
862 request->async->recv_time = cd->request.recv_time;
863
864 request->async->listen = listen;
865 request->async->packet_ctx = cd->packet_ctx;
866 request->async->priority = cd->priority;
867
868 /*
869 * Now that the "request" structure has been initialized, go decode the packet.
870 *
871 * Note that this also sets the "async process" function.
872 */
873 if (listen->app->decode) {
874 ret = listen->app->decode(listen->app_instance, request, cd->m.data, cd->m.data_size);
875 } else if (listen->app_io->decode) {
876 ret = listen->app_io->decode(listen->app_io_instance, request, cd->m.data, cd->m.data_size);
877 }
878
879 if (ret < 0) {
880 talloc_free(ctx);
881nak:
882 worker_nak(worker, cd, now);
883 return;
884 }
885
886 /*
887 * Set the entry point for this virtual server.
888 */
889 if (unlang_call_push(request, cd->listen->server_cs, UNLANG_TOP_FRAME) < 0) {
890 RERROR("Protocol failed to set 'process' function");
891 worker_nak(worker, cd, now);
892 return;
893 }
894
895 /*
896 * We're done with this message.
897 */
898 fr_message_done(&cd->m);
899
900 /*
901 * Look for conflicting / duplicate packets, but only if
902 * requested to do so.
903 */
904 if (request->async->listen->track_duplicates) {
905 request_t *old;
906
907 old = fr_rb_find(worker->dedup, request);
908 if (!old) {
909 goto insert_new;
910 }
911
912 fr_assert(old->async->listen == request->async->listen);
913 fr_assert(old->async->channel == request->async->channel);
914
915 /*
916 * There's a new packet. Do we keep the old one,
917 * or the new one? This decision is made by
918 * checking the recv_time, which is a
919 * nanosecond-resolution timer. If the time is
920 * identical, then the new packet is the same as
921 * the old one.
922 *
923 * If the new packet is a duplicate of the old
924 * one, then we can just discard the new one. We
925 * have to tell the channel that we've "eaten"
926 * this reply, so the sequence number should
927 * increase.
928 *
929 * @todo - fix the channel code to do queue
930 * depth, and not sequence / ack.
931 */
932 if (fr_time_eq(old->async->recv_time, request->async->recv_time)) {
933 RWARN("Discarding duplicate of request (%"PRIu64")", old->number);
934
935 fr_channel_null_reply(request->async->channel);
936 request_slab_release(request);
937
938 /*
939 * Signal there's a dup, and ignore the
940 * return code. We don't bother replying
941 * here, as an FD event or timer will
942 * wake up the request, and cause it to
943 * continue.
944 *
945 * @todo - the old request is NOT
946 * running, but is yielded. It MAY clean
947 * itself up, or do something...
948 */
950 worker->stats.dup++;
951 return;
952 }
953
954 /*
955 * Stop the old request, and decrement the number
956 * of active requests.
957 */
958 RWARN("Got conflicting packet for request (%" PRIu64 "), telling old request to stop", old->number);
959
961 worker->stats.dropped++;
962
963 insert_new:
964 (void) fr_rb_insert(worker->dedup, request);
965 }
966
967 worker_request_time_tracking_start(worker, request, now);
968
969 {
971
972 wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = listen });
973 if (!wl) {
974 MEM(wl = talloc_zero(worker, fr_worker_listen_t));
975 fr_dlist_init(&wl->dlist, request_t, listen_entry);
976 wl->listener = listen;
977
978 (void) fr_rb_insert(worker->listeners, wl);
979 }
980
981 fr_dlist_insert_tail(&wl->dlist, request);
982 }
983}
984
985/**
986 * Track a request_t in the "runnable" heap.
987 * Higher priorities take precedence, followed by lower sequence numbers
988 */
989static int8_t worker_runnable_cmp(void const *one, void const *two)
990{
991 request_t const *a = one, *b = two;
992 int ret;
993
994 ret = CMP(b->async->priority, a->async->priority);
995 if (ret != 0) return ret;
996
997 ret = CMP(a->async->sequence, b->async->sequence);
998 if (ret != 0) return ret;
999
1000 return fr_time_cmp(a->async->recv_time, b->async->recv_time);
1001}
1002
1003/**
1004 * Track a request_t in the "dedup" tree
1005 */
1006static int8_t worker_dedup_cmp(void const *one, void const *two)
1007{
1008 int ret;
1009 request_t const *a = one, *b = two;
1010
1011 ret = CMP(a->async->listen, b->async->listen);
1012 if (ret) return ret;
1013
1014 return CMP(a->async->packet_ctx, b->async->packet_ctx);
1015}
1016
1017/** Destroy a worker
1018 *
1019 * The input channels are signaled, and local messages are cleaned up.
1020 *
1021 * This should be called to _EXPLICITLY_ destroy a worker, when some fatal
1022 * error has occurred on the worker side, and we need to destroy it.
1023 *
1024 * We signal all pending requests in the backlog to stop, and tell the
1025 * network side that it should not send us any more requests.
1026 *
1027 * @param[in] worker the worker to destroy.
1028 */
1030{
1031 int i, count, ret;
1032
1033// WORKER_VERIFY;
1034
1035 /*
1036 * Stop any new requests running with this interpreter
1037 */
1039
1040 /*
1041 * Destroy all of the active requests. These are ones
1042 * which are still waiting for timers or file descriptor
1043 * events.
1044 */
1045 count = 0;
1046
1047 /*
1048 * Force the timeout event to fire for all requests that
1049 * are still running.
1050 */
1051 ret = fr_timer_list_force_run(worker->timeout);
1052 if (unlikely(ret < 0)) {
1053 fr_assert_msg(0, "Failed to force run the timeout list");
1054 } else {
1055 count += ret;
1056 }
1057
1059 if (unlikely(ret < 0)) {
1060 fr_assert_msg(0, "Failed to force run the custom timeout list");
1061 } else {
1062 count += ret;
1063 }
1065
1066 DEBUG("Worker is exiting - stopped %u requests", count);
1067
1068 /*
1069 * Signal the channels that we're closing.
1070 *
1071 * The other end owns the channel, and will take care of
1072 * popping messages in the TO_RESPONDER queue, and marking
1073 * them FR_MESSAGE_DONE. It will ignore the messages in
1074 * the TO_REQUESTOR queue, as we own those. They will be
1075 * automatically freed when our talloc context is freed.
1076 */
1077 for (i = 0; i < worker->config.max_channels; i++) {
1078 if (!worker->channel[i].ch) continue;
1079
1080 worker_requests_cancel(&worker->channel[i]);
1081
1083 "Pending messages in channel after cancelling request");
1084
1086 }
1087
1088 talloc_free(worker);
1089}
1090
1091/** Internal request (i.e. one generated by the interpreter) is now complete
1092 *
1093 */
1094static void _worker_request_internal_init(request_t *request, void *uctx)
1095{
1096 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1097 fr_time_t now = fr_time();
1098
1099 worker_request_init(worker, request, now);
1100
1101 /*
1102 * Requests generated by the interpreter
1103 * are always marked up as internal.
1104 */
1106 worker_request_time_tracking_start(worker, request, now);
1107}
1108
1109
1110/** External request is now complete
1111 *
1112 */
1113static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1114{
1115 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1116 fr_time_t now = fr_time();
1117
1118 /*
1119 * All external requests MUST have a listener.
1120 */
1122 fr_assert(request->async->listen != NULL);
1123
1124 /*
1125 * Only real packets are in the dedup tree. And even
1126 * then, only some of the time.
1127 */
1128 if (request->async->listen->track_duplicates) {
1129 (void) fr_rb_delete(worker->dedup, request);
1130 }
1131
1132 /*
1133 * If we're running a real request, then the final
1134 * indentation MUST be zero. Otherwise we skipped
1135 * something!
1136 *
1137 * Also check that the request is NOT marked as
1138 * "yielded", but is in fact done.
1139 *
1140 * @todo - check that the stack is at frame 0, otherwise
1141 * more things have gone wrong.
1142 */
1143 fr_assert_msg(request_is_internal(request) || request_is_detached(request) || (request->log.indent.unlang == 0),
1144 "Request %s bad log indentation - expected 0 got %u", request->name, request->log.indent.unlang);
1146 "Request %s is marked as yielded at end of processing", request->name);
1148 "Request %s stack depth %u > 0", request->name, unlang_interpret_stack_depth(request));
1149 RDEBUG("Done request");
1150
1151 /*
1152 * The request is done. Track that.
1153 */
1154 worker_request_time_tracking_end(worker, request, now);
1155
1156 /*
1157 * Remove it from the list of requests associated with this channel.
1158 */
1159 if (fr_dlist_entry_in_list(&request->async->entry)) {
1160 fr_dlist_entry_unlink(&request->async->entry);
1161 }
1162
1163 /*
1164 * These conditions are true when the server is
1165 * exiting and we're stopping all the requests.
1166 *
1167 * This should never happen otherwise.
1168 */
1169 if (unlikely((request->master_state == REQUEST_STOP_PROCESSING) &&
1170 !fr_channel_active(request->async->channel))) {
1171 request_slab_release(request);
1172 return;
1173 }
1174
1175 worker_send_reply(worker, request, request->master_state != REQUEST_STOP_PROCESSING, now);
1176 request_slab_release(request);
1177}
1178
1179/** Internal request (i.e. one generated by the interpreter) is now complete
1180 *
1181 * Whatever generated the request is now responsible for freeing it.
1182 */
1183static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1184{
1185 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1186
1187 worker_request_time_tracking_end(worker, request, fr_time());
1188
1189 fr_assert(!fr_heap_entry_inserted(request->runnable));
1190 fr_assert(!fr_timer_armed(request->timeout));
1191 fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1192}
1193
1194/** Detached request (i.e. one generated by the interpreter with no parent) is now complete
1195 *
1196 * As the request has no parent, then there's nothing to free it
1197 * so we have to.
1198 */
1199static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
1200{
1201 /*
1202 * No time tracking for detached requests
1203 * so we don't need to call
1204 * worker_request_time_tracking_end.
1205 */
1206 fr_assert(!fr_heap_entry_inserted(request->runnable));
1207
1208 /*
1209 * Normally worker_request_time_tracking_end
1210 * would remove the request from the time
1211 * order heap, but we need to do that for
1212 * detached requests.
1213 */
1214 TALLOC_FREE(request->timeout);
1215
1216 fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1217
1218 /*
1219 * Detached requests have to be freed by us
1220 * as nothing else can free them.
1221 *
1222 * All other requests must be freed by the
1223 * code which allocated them.
1224 */
1225 request_slab_release(request);
1226}
1227
1228
1229/** Make us responsible for running the request
1230 *
1231 */
1232static void _worker_request_detach(request_t *request, void *uctx)
1233{
1234 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1235
1236 if (request_is_detachable(request)) {
1237 /*
1238 * End the time tracking... We don't track detached requests,
1239 * because they don't contribute for the time consumed by an
1240 * external request.
1241 */
1242 if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) {
1243 RDEBUG3("Forcing time tracking to running state, from yielded, for request detach");
1244 fr_time_tracking_resume(&request->async->tracking, fr_time());
1245 }
1246 worker_request_time_tracking_end(worker, request, fr_time());
1247
1248 if (request_detach(request) < 0) RPEDEBUG("Failed detaching request");
1249
1250 RDEBUG3("Request is detached");
1251 } else {
1252 fr_assert_msg(0, "Request is not detachable");
1253 }
1254
1255 return;
1256}
1257
1258/** This is called by the interpreter when it wants to stop a request
1259 *
1260 * The idea is to get the request into the same state it would be in
1261 * if the interpreter had just finished with it.
1262 */
1263static void _worker_request_stop(request_t *request, void *uctx)
1264{
1265 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1266
1267 RDEBUG3("Cleaning up request execution state");
1268
1269 /*
1270 * Make sure time tracking is always in a
1271 * consistent state when we mark the request
1272 * as done.
1273 */
1274 if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) {
1275 RDEBUG3("Forcing time tracking to running state, from yielded, for request stop");
1276 fr_time_tracking_resume(&request->async->tracking, fr_time());
1277 }
1278
1279 /*
1280 * If the request is in the runnable queue
1281 * yank it back out, so it's not "runnable"
1282 * when we call request done.
1283 */
1284 if (fr_heap_entry_inserted(request->runnable)) fr_heap_extract(&worker->runnable, request);
1285}
1286
1287/** Request is now runnable
1288 *
1289 */
1290static void _worker_request_runnable(request_t *request, void *uctx)
1291{
1292 fr_worker_t *worker = uctx;
1293
1294 RDEBUG3("Request marked as runnable");
1295 fr_heap_insert(&worker->runnable, request);
1296}
1297
1298/** Interpreter yielded request
1299 *
1300 */
1301static void _worker_request_yield(request_t *request, UNUSED void *uctx)
1302{
1303 RDEBUG3("Request yielded");
1304 if (likely(!request_is_detached(request))) fr_time_tracking_yield(&request->async->tracking, fr_time());
1305}
1306
1307/** Interpreter is starting to work on request again
1308 *
1309 */
1310static void _worker_request_resume(request_t *request, UNUSED void *uctx)
1311{
1312 RDEBUG3("Request resuming");
1313 if (likely(!request_is_detached(request))) fr_time_tracking_resume(&request->async->tracking, fr_time());
1314}
1315
1316/** Check if a request is scheduled
1317 *
1318 */
1319static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
1320{
1321 return fr_heap_entry_inserted(request->runnable);
1322}
1323
1324/** Run a request
1325 *
1326 * Until it either yields, or is done.
1327 *
1328 * This function is also responsible for sending replies, and
1329 * cleaning up the request.
1330 *
1331 * @param[in] worker the worker
1332 * @param[in] start the current time
1333 */
1334static inline CC_HINT(always_inline) void worker_run_request(fr_worker_t *worker, fr_time_t start)
1335{
1336 request_t *request;
1337 fr_time_t now;
1338
1340
1341 now = start;
1342
1343 /*
1344 * Busy-loop running requests for 1ms. We still poll the
1345 * event loop 1000 times a second, OR when there's no
1346 * more work to do. This allows us to make progress with
1347 * ongoing requests, at the expense of sometimes ignoring
1348 * new ones.
1349 */
1350 while (fr_time_delta_lt(fr_time_sub(now, start), fr_time_delta_from_msec(1)) &&
1351 ((request = fr_heap_pop(&worker->runnable)) != NULL)) {
1352
1353 REQUEST_VERIFY(request);
1354 fr_assert(!fr_heap_entry_inserted(request->runnable));
1355
1356 /*
1357 * For real requests, if the channel is gone,
1358 * just stop the request and free it.
1359 */
1360 if (request->async->channel && !fr_channel_active(request->async->channel)) {
1361 worker_stop_request(&request);
1362 return;
1363 }
1364
1366
1367 now = fr_time();
1368 }
1369}
1370
1371/** Create a worker
1372 *
1373 * @param[in] ctx the talloc context
1374 * @param[in] name the name of this worker
1375 * @param[in] el the event list
1376 * @param[in] logger the destination for all logging messages
1377 * @param[in] lvl log level
1378 * @param[in] config various configuration parameters
1379 * @return
1380 * - NULL on error
1381 * - fr_worker_t on success
1382 */
1383fr_worker_t *fr_worker_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl,
1385{
1386 fr_worker_t *worker;
1387
1388 worker = talloc_zero(ctx, fr_worker_t);
1389 if (!worker) {
1390nomem:
1391 fr_strerror_const("Failed allocating memory");
1392 return NULL;
1393 }
1394
1395 worker->name = talloc_strdup(worker, name); /* thread locality */
1396
1398
1399 if (config) worker->config = *config;
1400
1401#define CHECK_CONFIG(_x, _min, _max) do { \
1402 if (!worker->config._x) worker->config._x = _min; \
1403 if (worker->config._x < _min) worker->config._x = _min; \
1404 if (worker->config._x > _max) worker->config._x = _max; \
1405 } while (0)
1406
1407#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max) do { \
1408 if (fr_time_delta_lt(worker->config._x, _min)) worker->config._x = _min; \
1409 if (fr_time_delta_gt(worker->config._x, _max)) worker->config._x = _max; \
1410 } while (0)
1411
1412 CHECK_CONFIG(max_requests,1024,(1 << 30));
1413 CHECK_CONFIG(max_channels, 64, 1024);
1414 CHECK_CONFIG(reuse.child_pool_size, 4096, 65536);
1415 CHECK_CONFIG(message_set_size, 1024, 8192);
1416 CHECK_CONFIG(ring_buffer_size, (1 << 17), (1 << 20));
1418
1419 worker->channel = talloc_zero_array(worker, fr_worker_channel_t, worker->config.max_channels);
1420 if (!worker->channel) {
1421 talloc_free(worker);
1422 goto nomem;
1423 }
1424
1425 worker->thread_id = pthread_self();
1426 worker->el = el;
1427 worker->log = logger;
1428 worker->lvl = lvl;
1429
1430 /*
1431 * The worker thread starts now. Manually initialize it,
1432 * because we're tracking request time, not the time that
1433 * the worker thread is running.
1434 */
1435 memset(&worker->tracking, 0, sizeof(worker->tracking));
1436
1437 worker->aq_control = fr_atomic_queue_alloc(worker, 1024);
1438 if (!worker->aq_control) {
1439 fr_strerror_const("Failed creating atomic queue");
1440 fail:
1441 talloc_free(worker);
1442 return NULL;
1443 }
1444
1445 worker->control = fr_control_create(worker, el, worker->aq_control);
1446 if (!worker->control) {
1447 fr_strerror_const_push("Failed creating control plane");
1448 goto fail;
1449 }
1450
1452 fr_strerror_const_push("Failed adding control channel");
1453 goto fail;
1454 }
1455
1457 fr_strerror_const_push("Failed adding callback for listeners");
1458 goto fail;
1459 }
1460
1461 worker->runnable = fr_heap_talloc_alloc(worker, worker_runnable_cmp, request_t, runnable, 0);
1462 if (!worker->runnable) {
1463 fr_strerror_const("Failed creating runnable heap");
1464 goto fail;
1465 }
1466
1467 worker->timeout = fr_timer_list_ordered_alloc(worker, el->tl);
1468 if (!worker->timeout) {
1469 fr_strerror_const("Failed creating timeouts list");
1470 goto fail;
1471 }
1472
1473 worker->timeout_custom = fr_timer_list_lst_alloc(worker, el->tl);
1474 if (!worker->timeout_custom) {
1475 fr_strerror_const("Failed creating custom timeouts list");
1476 goto fail;
1477 }
1478
1479 worker->dedup = fr_rb_inline_talloc_alloc(worker, request_t, dedup_node, worker_dedup_cmp, NULL);
1480 if (!worker->dedup) {
1481 fr_strerror_const("Failed creating de_dup tree");
1482 goto fail;
1483 }
1484
1486 if (!worker->listeners) {
1487 fr_strerror_const("Failed creating listener tree");
1488 goto fail;
1489 }
1490
1491 worker->intp = unlang_interpret_init(worker, el,
1493 .init_internal = _worker_request_internal_init,
1494
1495 .done_external = _worker_request_done_external,
1496 .done_internal = _worker_request_done_internal,
1497 .done_detached = _worker_request_done_detached,
1498
1499 .detach = _worker_request_detach,
1500 .stop = _worker_request_stop,
1501 .yield = _worker_request_yield,
1502 .resume = _worker_request_resume,
1503 .mark_runnable = _worker_request_runnable,
1504
1505 .scheduled = _worker_request_scheduled
1506 },
1507 worker);
1508 if (!worker->intp){
1509 fr_strerror_const("Failed initialising interpreter");
1510 goto fail;
1511 }
1512
1513 {
1516
1517 if (!(worker->slab = request_slab_list_alloc(worker, el, &worker->config.reuse, NULL, NULL,
1518 UNCONST(void *, worker), true, false))) {
1519 fr_strerror_const("Failed creating request slab list");
1520 goto fail;
1521 }
1522 }
1523
1525
1526 return worker;
1527}
1528
1529
1530/** The main loop and entry point of the stand-alone worker thread.
1531 *
1532 * Where there is only one thread, the event loop runs fr_worker_pre_event() and fr_worker_post_event()
1533 * instead, And then fr_worker_post_event() takes care of calling worker_run_request() to actually run the
1534 * request.
1535 *
1536 * @param[in] worker the worker data structure to manage
1537 */
1539{
1541
1542 while (true) {
1543 bool wait_for_event;
1544 int num_events;
1545
1547
1548 /*
1549 * There are runnable requests. We still service
1550 * the event loop, but we don't wait for events.
1551 */
1552 wait_for_event = (fr_heap_num_elements(worker->runnable) == 0);
1553 if (wait_for_event) {
1554 if (worker->exiting && (worker_num_requests(worker) == 0)) break;
1555
1556 DEBUG4("Ready to process requests");
1557 }
1558
1559 /*
1560 * Check the event list. If there's an error
1561 * (e.g. exit), we stop looping and clean up.
1562 */
1563 DEBUG4("Gathering events - %s", wait_for_event ? "will wait" : "Will not wait");
1564 num_events = fr_event_corral(worker->el, fr_time(), wait_for_event);
1565 if (num_events < 0) {
1566 if (fr_event_loop_exiting(worker->el)) {
1567 DEBUG4("Event loop exiting");
1568 break;
1569 }
1570
1571 PERROR("Failed retrieving events");
1572 break;
1573 }
1574
1575 DEBUG4("%u event(s) pending", num_events);
1576
1577 /*
1578 * Service outstanding events.
1579 */
1580 if (num_events > 0) {
1581 DEBUG4("Servicing event(s)");
1582 fr_event_service(worker->el);
1583 }
1584
1585 /*
1586 * Run any outstanding requests.
1587 */
1588 worker_run_request(worker, fr_time());
1589 }
1590}
1591
1592/** Pre-event handler
1593 *
1594 * This should be run ONLY in single-threaded mode!
1595 */
1597{
1598 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1599 request_t *request;
1600
1601 request = fr_heap_peek(worker->runnable);
1602 if (!request) return 0;
1603
1604 /*
1605 * There's work to do. Tell the event handler to poll
1606 * for IO / timers, but also immediately return to the
1607 * calling function, which has more work to do.
1608 */
1609 return 1;
1610}
1611
1612
1613/** Post-event handler
1614 *
1615 * This should be run ONLY in single-threaded mode!
1616 */
1618{
1619 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1620
1621 worker_run_request(worker, fr_time()); /* Event loop time can be too old, and trigger asserts */
1622}
1623
1624/** Print debug information about the worker structure
1625 *
1626 * @param[in] worker the worker
1627 * @param[in] fp the file where the debug output is printed.
1628 */
1629void fr_worker_debug(fr_worker_t *worker, FILE *fp)
1630{
1632
1633 fprintf(fp, "\tnum_channels = %d\n", worker->num_channels);
1634 fprintf(fp, "\tstats.in = %" PRIu64 "\n", worker->stats.in);
1635
1636 fprintf(fp, "\tcalculated (predicted) total CPU time = %" PRIu64 "\n",
1637 fr_time_delta_unwrap(worker->predicted) * worker->stats.in);
1638 fprintf(fp, "\tcalculated (counted) per request time = %" PRIu64 "\n",
1640
1641 fr_time_tracking_debug(&worker->tracking, fp);
1642
1643}
1644
1645/** Create a channel to the worker
1646 *
1647 * Called by the master (i.e. network) thread when it needs to create
1648 * a new channel to a particuler worker.
1649 *
1650 * @param[in] worker the worker
1651 * @param[in] master the control plane of the master
1652 * @param[in] ctx the context in which the channel will be created
1653 */
1655{
1656 fr_channel_t *ch;
1657 pthread_t id;
1658 bool same;
1659
1661
1662 id = pthread_self();
1663 same = (pthread_equal(id, worker->thread_id) != 0);
1664
1665 ch = fr_channel_create(ctx, master, worker->control, same);
1666 if (!ch) return NULL;
1667
1669
1670 /*
1671 * Tell the worker about the channel
1672 */
1673 if (fr_channel_signal_open(ch) < 0) {
1674 talloc_free(ch);
1675 return NULL;
1676 }
1677
1678 return ch;
1679}
1680
1682{
1684
1685 /*
1686 * Skip a bunch of work if we're already in the worker thread.
1687 */
1688 if (is_worker_thread(worker)) {
1689 return fr_worker_listen_cancel_self(worker, li);
1690 }
1691
1693 if (!rb) return -1;
1694
1695 return fr_control_message_send(worker->control, rb, FR_CONTROL_ID_LISTEN, &li, sizeof(li));
1696}
1697
1698#ifdef WITH_VERIFY_PTR
1699/** Verify the worker data structures.
1700 *
1701 * @param[in] worker the worker
1702 */
1703static void worker_verify(fr_worker_t *worker)
1704{
1705 int i;
1706
1707 (void) talloc_get_type_abort(worker, fr_worker_t);
1708 fr_atomic_queue_verify(worker->aq_control);
1709
1710 fr_assert(worker->control != NULL);
1711 (void) talloc_get_type_abort(worker->control, fr_control_t);
1712
1713 fr_assert(worker->el != NULL);
1714 (void) talloc_get_type_abort(worker->el, fr_event_list_t);
1715
1716 fr_assert(worker->runnable != NULL);
1717 (void) talloc_get_type_abort(worker->runnable, fr_heap_t);
1718
1719 fr_assert(worker->dedup != NULL);
1720 (void) talloc_get_type_abort(worker->dedup, fr_rb_tree_t);
1721
1722 for (i = 0; i < worker->config.max_channels; i++) {
1723 if (!worker->channel[i].ch) continue;
1724
1725 (void) talloc_get_type_abort(worker->channel[i].ch, fr_channel_t);
1726 }
1727}
1728#endif
1729
1730int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
1731{
1732 if (num < 0) return -1;
1733 if (num == 0) return 0;
1734
1735 stats[0] = worker->stats.in;
1736 if (num >= 2) stats[1] = worker->stats.out;
1737 if (num >= 3) stats[2] = worker->stats.dup;
1738 if (num >= 4) stats[3] = worker->stats.dropped;
1739 if (num >= 5) stats[4] = worker->num_naks;
1740 if (num >= 6) stats[5] = worker->num_active;
1741
1742 if (num <= 6) return num;
1743
1744 return 6;
1745}
1746
1747static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
1748{
1749 fr_worker_t const *worker = ctx;
1750 fr_time_delta_t when;
1751
1752 if ((info->argc == 0) || (strcmp(info->argv[0], "count") == 0)) {
1753 fprintf(fp, "count.in\t\t\t%" PRIu64 "\n", worker->stats.in);
1754 fprintf(fp, "count.out\t\t\t%" PRIu64 "\n", worker->stats.out);
1755 fprintf(fp, "count.dup\t\t\t%" PRIu64 "\n", worker->stats.dup);
1756 fprintf(fp, "count.dropped\t\t\t%" PRIu64 "\n", worker->stats.dropped);
1757 fprintf(fp, "count.naks\t\t\t%" PRIu64 "\n", worker->num_naks);
1758 fprintf(fp, "count.active\t\t\t%" PRIu64 "\n", worker->num_active);
1759 fprintf(fp, "count.runnable\t\t\t%u\n", fr_heap_num_elements(worker->runnable));
1760 }
1761
1762 if ((info->argc == 0) || (strcmp(info->argv[0], "cpu") == 0)) {
1763 when = worker->predicted;
1764 fprintf(fp, "cpu.request_time_rtt\t\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1765
1766 when = worker->tracking.running_total;
1767 if (fr_time_delta_ispos(when)) when = fr_time_delta_div(when, fr_time_delta_wrap(worker->stats.in - worker->stats.dropped));
1768 fprintf(fp, "cpu.average_request_time\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1769
1770 when = worker->tracking.running_total;
1771 fprintf(fp, "cpu.used\t\t\t%.6f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1772
1773 when = worker->tracking.waiting_total;
1774 fprintf(fp, "cpu.waiting\t\t\t%.3f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1775
1776 fr_time_elapsed_fprint(fp, &worker->cpu_time, "cpu.requests", 4);
1777 fr_time_elapsed_fprint(fp, &worker->wall_clock, "time.requests", 4);
1778 }
1779
1780 return 0;
1781}
1782
1784 {
1785 .parent = "stats",
1786 .name = "worker",
1787 .help = "Statistics for workers threads.",
1788 .read_only = true
1789 },
1790
1791 {
1792 .parent = "stats worker",
1793 .add_name = true,
1794 .name = "self",
1795 .syntax = "[(count|cpu)]",
1796 .func = cmd_stats_worker,
1797 .help = "Show statistics for a specific worker thread.",
1798 .read_only = true
1799 },
1800
1802};
static int const char char buffer[256]
Definition acutest.h:576
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
Definition app_io.h:55
size_t default_reply_size
same for replies
Definition app_io.h:40
size_t default_message_size
Usually maximum message size.
Definition app_io.h:39
fr_io_nak_t nak
Function to send a NAK.
Definition app_io.h:62
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
Definition app_io.h:54
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
Definition application.h:80
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
Definition application.h:85
#define fr_atexit_thread_local(_name, _free, _uctx)
Definition atexit.h:221
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Structure to hold the atomic queue.
#define atomic_uint64_t
#define UNCONST(_type, _ptr)
Remove const qualification from a pointer.
Definition build.h:167
#define RCSID(id)
Definition build.h:485
#define NDEBUG_UNUSED
Definition build.h:328
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:112
#define unlikely(_x)
Definition build.h:383
#define UNUSED
Definition build.h:317
unlang_action_t unlang_call_push(request_t *request, CONF_SECTION *server_cs, bool top_frame)
Push a call frame onto the stack.
Definition call.c:147
fr_table_num_sorted_t const channel_signals[]
Definition channel.c:153
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
Definition channel.c:183
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition channel.c:685
int fr_channel_set_recv_request(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_request)
Definition channel.c:938
void * fr_channel_responder_uctx_get(fr_channel_t *ch)
Get responder-specific data from a channel.
Definition channel.c:897
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
Definition channel.c:472
int fr_channel_null_reply(fr_channel_t *ch)
Don't send a reply message into the channel.
Definition channel.c:624
void fr_channel_responder_uctx_add(fr_channel_t *ch, void *uctx)
Add responder-specific data to a channel.
Definition channel.c:885
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
Definition channel.c:511
bool fr_channel_active(fr_channel_t *ch)
Check if a channel is active.
Definition channel.c:812
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
Definition channel.c:854
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
Definition channel.c:952
A full channel, which consists of two ends.
Definition channel.c:144
fr_message_t m
the message header
Definition channel.h:105
fr_channel_event_t
Definition channel.h:67
@ FR_CHANNEL_NOOP
Definition channel.h:74
@ FR_CHANNEL_EMPTY
Definition channel.h:75
@ FR_CHANNEL_CLOSE
Definition channel.h:72
@ FR_CHANNEL_ERROR
Definition channel.h:68
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition channel.h:70
@ FR_CHANNEL_OPEN
Definition channel.h:71
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition channel.h:69
void * packet_ctx
Packet specific context for holding client information, and other proto_* specific information that n...
Definition channel.h:142
fr_listen_t * listen
for tracking packet transport, etc.
Definition channel.h:146
uint32_t priority
Priority of this packet.
Definition channel.h:140
Channel information which is added to a message.
Definition channel.h:104
int argc
current argument count
Definition command.h:39
char const * parent
e.g. "show module"
Definition command.h:52
#define CMD_TABLE_END
Definition command.h:62
char const ** argv
text version of commands
Definition command.h:42
#define FR_CONTROL_ID_LISTEN_DEAD
Definition control.h:61
#define FR_CONTROL_ID_CHANNEL
Definition control.h:56
#define FR_CONTROL_ID_LISTEN
Definition control.h:57
#define FR_CONTROL_MAX_SIZE
Definition control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition control.h:50
static fr_ring_buffer_t * rb
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:139
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:210
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:156
#define MEM(x)
Definition debug.h:36
#define ERROR(fmt,...)
Definition dhcpclient.c:41
#define DEBUG(fmt,...)
Definition dhcpclient.c:39
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:260
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition dlist.h:486
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition dlist.h:163
static void fr_dlist_entry_unlink(fr_dlist_t *entry)
Remove an item from the dlist when we don't have access to the head.
Definition dlist.h:146
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:939
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition dlist.h:672
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:378
static void fr_dlist_entry_init(fr_dlist_t *entry)
Initialise a linked list without metadata.
Definition dlist.h:138
Head of a doubly linked list.
Definition dlist.h:51
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
Definition heap.c:322
int fr_heap_extract(fr_heap_t **hp, void *data)
Remove a node from the heap.
Definition heap.c:239
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition heap.h:136
static bool fr_heap_entry_inserted(fr_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
Definition heap.h:124
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition heap.h:115
The main heap structure.
Definition heap.h:66
rlm_rcode_t unlang_interpret(request_t *request, bool running)
Run the interpreter for a current request.
Definition interpret.c:784
void unlang_interpret_set(request_t *request, unlang_interpret_t *intp)
Set a specific interpreter for a request.
Definition interpret.c:1803
int unlang_interpret_stack_depth(request_t *request)
Return the depth of the request's stack.
Definition interpret.c:1334
void unlang_interpret_set_thread_default(unlang_interpret_t *intp)
Set the default interpreter for this thread.
Definition interpret.c:1834
unlang_interpret_t * unlang_interpret_init(TALLOC_CTX *ctx, fr_event_list_t *el, unlang_request_func_t *funcs, void *uctx)
Initialize a unlang compiler / interpret.
Definition interpret.c:1761
void unlang_interpret_signal(request_t *request, fr_signal_t action)
Send a signal (usually stop) to a request.
Definition interpret.c:1212
bool unlang_interpret_is_resumable(request_t *request)
Check if a request as resumable.
Definition interpret.c:1400
#define UNLANG_REQUEST_RESUME
Definition interpret.h:42
#define UNLANG_TOP_FRAME
Definition interpret.h:35
External functions provided by the owner of the interpret.
Definition interpret.h:103
uint64_t out
Definition base.h:43
uint64_t dup
Definition base.h:44
uint64_t dropped
Definition base.h:45
uint64_t in
Definition base.h:42
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition control.c:417
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:343
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition control.c:149
The control structure.
Definition control.c:79
void const * app_instance
Definition listen.h:39
fr_app_t const * app
Definition listen.h:38
void const * app_io_instance
I/O path configuration context.
Definition listen.h:33
CONF_SECTION * server_cs
CONF_SECTION of the server.
Definition listen.h:41
fr_dict_t const * dict
dictionary for this listener
Definition listen.h:30
fr_app_io_t const * app_io
I/O path functions.
Definition listen.h:32
Minimal data structure to use the new code.
Definition listen.h:59
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define RDEBUG3(fmt,...)
Definition log.h:343
#define RWARN(fmt,...)
Definition log.h:297
#define RERROR(fmt,...)
Definition log.h:298
#define DEBUG4(_fmt,...)
Definition log.h:267
#define RPERROR(fmt,...)
Definition log.h:302
#define RPEDEBUG(fmt,...)
Definition log.h:376
#define RATE_LIMIT_GLOBAL(_log, _fmt,...)
Rate limit messages using a global limiting entry.
Definition log.h:641
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition event.c:2194
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition event.c:2059
talloc_free(reap)
int fr_event_post_delete(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Delete a post-event callback from the event list.
Definition event.c:2030
bool fr_event_loop_exiting(fr_event_list_t *el)
Check to see whether the event loop is in the process of exiting.
Definition event.c:2383
Stores all information relating to an event list.
Definition event.c:377
fr_log_lvl_t
Definition log.h:67
fr_packet_t * fr_packet_alloc(TALLOC_CTX *ctx, bool new_vector)
Allocate a new fr_packet_t.
Definition packet.c:38
unsigned int uint32_t
long int ssize_t
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:190
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:988
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition message.c:1238
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition message.c:934
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
Definition message.c:127
A Message set, composed of message headers and ring buffer data.
Definition message.c:95
size_t rb_size
cache-aligned size in the ring buffer
Definition message.h:51
fr_time_t when
when this message was sent
Definition message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
static const conf_parser_t config[]
Definition base.c:183
#define fr_assert(_expr)
Definition rad_assert.h:38
#define REDEBUG(fmt,...)
Definition radclient.h:52
#define RDEBUG(fmt,...)
Definition radclient.h:53
#define DEBUG2(fmt,...)
Definition radclient.h:43
static void send_reply(int sockfd, fr_channel_data_t *reply)
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
bool fr_rb_delete(fr_rb_tree_t *tree, void const *data)
Remove node and free data (if a free function was specified)
Definition rb.c:741
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
Definition rb.h:246
The main red black tree structure.
Definition rb.h:73
rlm_rcode_t
Return codes indicating the result of the module call.
Definition rcode.h:40
int request_slab_deinit(request_t *request)
Callback for slabs to deinitialise the request.
Definition request.c:378
int request_detach(request_t *child)
Unlink a subrequest from its parent.
Definition request.c:535
#define REQUEST_VERIFY(_x)
Definition request.h:305
#define request_is_detached(_x)
Definition request.h:186
#define request_is_external(_x)
Definition request.h:184
#define REQUEST_POOL_HEADERS
Definition request.h:66
#define request_is_internal(_x)
Definition request.h:185
@ REQUEST_TYPE_EXTERNAL
A request received on the wire.
Definition request.h:178
#define request_is_detachable(_x)
Definition request.h:187
#define request_init(_ctx, _type, _args)
Definition request.h:317
#define REQUEST_POOL_SIZE
Definition request.h:79
@ REQUEST_STOP_PROCESSING
Request has been signalled to stop.
Definition request.h:88
Optional arguments for initialising requests.
Definition request.h:283
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:64
static char const * name
@ FR_SIGNAL_DUP
A duplicate request was received.
Definition signal.h:44
@ FR_SIGNAL_CANCEL
Request has been cancelled.
Definition signal.h:40
#define FR_SLAB_FUNCS(_name, _type)
Define type specific wrapper functions for slabs and slab elements.
Definition slab.h:120
#define FR_SLAB_TYPES(_name, _type)
Define type specific wrapper structs for slabs and slab elements.
Definition slab.h:72
unsigned int num_children
How many child allocations are expected off each element.
Definition slab.h:48
size_t child_pool_size
Size of pool space to be allocated to each element.
Definition slab.h:49
return count
Definition module.c:155
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
@ memory_order_seq_cst
Definition stdatomic.h:132
#define atomic_fetch_add_explicit(object, operand, order)
Definition stdatomic.h:302
Definition log.h:96
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition talloc.h:224
void fr_time_elapsed_update(fr_time_elapsed_t *elapsed, fr_time_t start, fr_time_t end)
Definition time.c:547
void fr_time_elapsed_fprint(FILE *fp, fr_time_elapsed_t const *elapsed, char const *prefix, int tab_offset)
Definition time.c:592
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition time.h:575
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
Definition time.h:154
#define fr_time_delta_lt(_a, _b)
Definition time.h:285
static fr_time_delta_t fr_time_delta_from_sec(int64_t sec)
Definition time.h:590
#define fr_time_delta_wrap(_time)
Definition time.h:152
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_eq(_a, _b)
Definition time.h:241
#define fr_time_delta_eq(_a, _b)
Definition time.h:287
#define NSEC
Definition time.h:379
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
static fr_time_delta_t fr_time_delta_div(fr_time_delta_t a, fr_time_delta_t b)
Definition time.h:267
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition time.h:916
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
@ FR_TIME_TRACKING_YIELDED
We're currently tracking time in the yielded state.
static void fr_time_tracking_yield(fr_time_tracking_t *tt, fr_time_t now)
Transition to the yielded state, recording the time we just spent running.
static void fr_time_tracking_end(fr_time_delta_t *predicted, fr_time_tracking_t *tt, fr_time_t now)
End time tracking for this entity.
fr_time_delta_t waiting_total
total time spent waiting
fr_time_delta_t running_total
total time spent running
static void fr_time_tracking_start(fr_time_tracking_t *parent, fr_time_tracking_t *tt, fr_time_t now)
Start time tracking for a tracked entity.
static void fr_time_tracking_resume(fr_time_tracking_t *tt, fr_time_t now)
Track that a request resumed.
static void fr_time_tracking_debug(fr_time_tracking_t *tt, FILE *fp)
Print debug information about the time tracking structure.
uint64_t fr_timer_list_num_events(fr_timer_list_t *tl)
Return number of pending events.
Definition timer.c:980
fr_timer_list_t * fr_timer_list_ordered_alloc(TALLOC_CTX *ctx, fr_timer_list_t *parent)
Allocate a new sorted event timer list.
Definition timer.c:1091
int fr_timer_list_force_run(fr_timer_list_t *tl)
Get the head of the timer list, the event may not be ready to fire.
Definition timer.c:812
fr_timer_list_t * fr_timer_list_lst_alloc(TALLOC_CTX *ctx, fr_timer_list_t *parent)
Allocate a new lst based timer list.
Definition timer.c:1065
An event timer list.
Definition timer.c:49
#define fr_timer_armed(_ev)
Definition timer.h:117
#define fr_timer_in(...)
Definition timer.h:86
static fr_event_list_t * el
int unlang_thread_instantiate(TALLOC_CTX *ctx)
Create thread-specific data structures for unlang.
Definition compile.c:5164
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
Definition strerror.c:732
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1274
fr_heap_t * runnable
current runnable requests which we've spent time processing
Definition worker.c:118
static void worker_request_time_tracking_end(fr_worker_t *worker, request_t *request, fr_time_t now)
Definition worker.c:604
static void _worker_request_yield(request_t *request, UNUSED void *uctx)
Interpreter yielded request.
Definition worker.c:1301
static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
Handle a control plane message sent to the worker via a channel.
Definition worker.c:254
fr_event_list_t * el
our event list
Definition worker.c:114
int fr_worker_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
Pre-event handler.
Definition worker.c:1596
static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now)
Send a response packet to the network side.
Definition worker.c:621
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
Definition worker.c:1654
fr_rb_tree_t * listeners
so we can cancel requests when a listener goes away
Definition worker.c:127
fr_channel_t * ch
Definition worker.c:87
static void worker_run_request(fr_worker_t *worker, fr_time_t start)
Run a request.
Definition worker.c:1334
static void worker_exit(fr_worker_t *worker)
Definition worker.c:232
#define WORKER_VERIFY
Definition worker.c:75
bool was_sleeping
used to suppress multiple sleep signals in a row
Definition worker.c:139
static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
Definition worker.c:1747
static void _worker_request_runnable(request_t *request, void *uctx)
Request is now runnable.
Definition worker.c:1290
static char * itoa_internal(TALLOC_CTX *ctx, uint64_t number)
Definition worker.c:746
fr_worker_t * fr_worker_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config)
Create a worker.
Definition worker.c:1383
static int8_t worker_dedup_cmp(void const *one, void const *two)
Track a request_t in the "dedup" tree.
Definition worker.c:1006
static void worker_stop_request(request_t **request_p)
Signal the unlang interpreter that it needs to stop running the request.
Definition worker.c:513
fr_worker_channel_t * channel
list of channels
Definition worker.c:142
char const * name
name of this worker
Definition worker.c:100
uint64_t num_active
number of active requests
Definition worker.c:134
fr_cmd_table_t cmd_worker_table[]
Definition worker.c:1783
static _Thread_local fr_ring_buffer_t * fr_worker_rb
Definition worker.c:84
static int worker_request_time_tracking_start(fr_worker_t *worker, request_t *request, fr_time_t now)
Start time tracking for a request, and mark it as runnable.
Definition worker.c:575
int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
Definition worker.c:1730
static int _worker_request_deinit(request_t *request, UNUSED void *uctx)
Definition worker.c:799
fr_dlist_head_t dlist
Definition worker.c:93
static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
Detached request (i.e.
Definition worker.c:1199
static void _worker_request_resume(request_t *request, UNUSED void *uctx)
Interpreter is starting to work on request again.
Definition worker.c:1310
fr_rb_tree_t * dedup
de-dup tree
Definition worker.c:125
fr_atomic_queue_t * aq_control
atomic queue for control messages sent to me
Definition worker.c:110
static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Send a NAK to the network thread.
Definition worker.c:420
static void worker_request_name_number(request_t *request)
Definition worker.c:786
static void _worker_request_stop(request_t *request, void *uctx)
This is called by the interpreter when it wants to stop a request.
Definition worker.c:1263
static void _worker_request_timeout(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t when, void *uctx)
Enforce max_request_time.
Definition worker.c:533
static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Definition worker.c:804
fr_log_t const * log
log destination
Definition worker.c:107
fr_io_stats_t stats
input / output stats
Definition worker.c:129
#define CHECK_CONFIG(_x, _min, _max)
static void _worker_request_detach(request_t *request, void *uctx)
Make us responsible for running the request.
Definition worker.c:1232
static int _fr_worker_rb_free(void *arg)
Definition worker.c:172
fr_time_tracking_t tracking
how much time the worker has spent doing things.
Definition worker.c:137
static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
External request is now complete.
Definition worker.c:1113
void fr_worker_destroy(fr_worker_t *worker)
Destroy a worker.
Definition worker.c:1029
uint64_t num_naks
number of messages which were nak'd
Definition worker.c:133
static void worker_request_init(fr_worker_t *worker, request_t *request, fr_time_t now)
Initialize various request fields needed by the worker.
Definition worker.c:769
fr_worker_config_t config
external configuration
Definition worker.c:101
fr_listen_t const * listener
incoming packets
Definition worker.c:148
unlang_interpret_t * intp
Worker's local interpreter.
Definition worker.c:103
static int fr_worker_listen_cancel_self(fr_worker_t *worker, fr_listen_t const *li)
Definition worker.c:371
bool exiting
are we exiting?
Definition worker.c:140
fr_log_lvl_t lvl
log level
Definition worker.c:108
static void worker_requests_cancel(fr_worker_channel_t *ch)
Definition worker.c:223
int num_channels
actual number of channels
Definition worker.c:116
fr_time_delta_t max_request_time
maximum time a request can be processed
Definition worker.c:123
static atomic_uint64_t request_number
Definition worker.c:79
fr_time_elapsed_t cpu_time
histogram of total CPU time per request
Definition worker.c:130
fr_rb_node_t node
in tree of listeners
Definition worker.c:150
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
Definition worker.c:1681
int fr_worker_request_timeout_set(fr_worker_t *worker, request_t *request, fr_time_delta_t timeout)
Set, or re-set the request timer.
Definition worker.c:557
void fr_worker_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Post-event handler.
Definition worker.c:1617
fr_dlist_head_t dlist
of requests associated with this listener.
Definition worker.c:156
void fr_worker(fr_worker_t *worker)
The main loop and entry point of the stand-alone worker thread.
Definition worker.c:1538
request_slab_list_t * slab
slab allocator for request_t
Definition worker.c:144
static uint32_t worker_num_requests(fr_worker_t *worker)
Definition worker.c:794
fr_time_delta_t predicted
How long we predict a request will take to execute.
Definition worker.c:136
pthread_t thread_id
my thread ID
Definition worker.c:105
static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Callback which handles a message being received on the worker side.
Definition worker.c:213
fr_time_elapsed_t wall_clock
histogram of wall clock time per request
Definition worker.c:131
static int8_t worker_listener_cmp(void const *one, void const *two)
Definition worker.c:160
static bool is_worker_thread(fr_worker_t const *worker)
Definition worker.c:199
fr_timer_list_t * timeout
Track when requests timeout using a dlist.
Definition worker.c:120
static fr_ring_buffer_t * fr_worker_rb_init(void)
Initialise thread local storage.
Definition worker.c:181
fr_control_t * control
the control plane
Definition worker.c:112
static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
Check if a request is scheduled.
Definition worker.c:1319
static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Internal request (i.e.
Definition worker.c:1183
fr_timer_list_t * timeout_custom
Track when requests timeout using an lst.
Definition worker.c:121
void fr_worker_debug(fr_worker_t *worker, FILE *fp)
Print debug information about the worker structure.
Definition worker.c:1629
static void _worker_request_internal_init(request_t *request, void *uctx)
Internal request (i.e.
Definition worker.c:1094
#define CACHE_LINE_SIZE
Definition worker.c:78
#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max)
static int8_t worker_runnable_cmp(void const *one, void const *two)
Track a request_t in the "runnable" heap.
Definition worker.c:989
static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
A socket is going away, so clean up any requests which use this socket.
Definition worker.c:398
A worker which takes packets from a master, and processes them.
Definition worker.c:99
int message_set_size
default start number of messages
Definition worker.h:64
int max_requests
max requests this worker will handle
Definition worker.h:60
int max_channels
maximum number of channels
Definition worker.h:62
fr_slab_config_t reuse
slab allocator configuration
Definition worker.h:69
int ring_buffer_size
default start size for the ring buffers
Definition worker.h:65
fr_time_delta_t max_request_time
maximum time a request can be processed
Definition worker.h:67