The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
worker.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 /**
18 * $Id: 05bca03d4e15a1eed7b2873d3279d6e926a01da3 $
19 *
20 * @brief Worker thread functions.
21 * @file io/worker.c
22 *
23 * The "worker" thread is the one responsible for the bulk of the
24 * work done when processing a request. Workers are spawned by the
25 * scheduler, and create a kqueue (KQ) and control-plane
26 * Atomic Queue (AQ) for control-plane communication.
27 *
28 * When a network thread discovers that it needs more workers, it
29 * asks the scheduler for a KQ/AQ combination. The network thread
30 * then creates a channel dedicated to that worker, and sends the
31 * channel to the worker in a "new channel" message. The worker
32 * receives the channel, and sends an ACK back to the network thread.
33 *
34 * The network thread then sends the worker new packets, which the
35 * worker receives and processes.
36 *
37 * When a packet is decoded, it is put into the "runnable" heap, and
38 * also into the timeout sublist. The main loop fr_worker() then
39 * pulls new requests off of this heap and runs them. The main event
40 * loop checks the head of the timeout sublist, and forcefully terminates
41 * any requests which have been running for too long.
42 *
43 * If a request is yielded, it is placed onto the yielded list in
44 * the worker "tracking" data structure.
45 *
46 * @copyright 2016 Alan DeKok (aland@freeradius.org)
47 */
48
49RCSID("$Id: 05bca03d4e15a1eed7b2873d3279d6e926a01da3 $")
50
51#define LOG_PREFIX worker->name
52#define LOG_DST worker->log
53
54#include <freeradius-devel/io/channel.h>
55#include <freeradius-devel/io/listen.h>
56#include <freeradius-devel/io/message.h>
57#include <freeradius-devel/io/worker.h>
58#include <freeradius-devel/unlang/base.h>
59#include <freeradius-devel/unlang/call.h>
60#include <freeradius-devel/unlang/interpret.h>
61#include <freeradius-devel/server/request.h>
62#include <freeradius-devel/server/time_tracking.h>
63#include <freeradius-devel/util/dlist.h>
64#include <freeradius-devel/util/minmax_heap.h>
65#include <freeradius-devel/util/slab.h>
66#include <freeradius-devel/util/time.h>
67#include <freeradius-devel/util/timer.h>
68
69#include <stdalign.h>
70
71#ifdef WITH_VERIFY_PTR
72static void worker_verify(fr_worker_t *worker);
73#define WORKER_VERIFY worker_verify(worker)
74#else
75#define WORKER_VERIFY
76#endif
77
78#define CACHE_LINE_SIZE 64
80
83
84static _Thread_local fr_ring_buffer_t *fr_worker_rb;
85
86typedef struct {
88
89 /*
90 * To save time, we don't care about num_elements here. Which means that we don't
91 * need to cache or lookup the fr_worker_listen_t when we free a request.
92 */
95
96/**
97 * A worker which takes packets from a master, and processes them.
98 */
100 char const *name; //!< name of this worker
101 fr_worker_config_t config; //!< external configuration
102
103 unlang_interpret_t *intp; //!< Worker's local interpreter.
104
105 pthread_t thread_id; //!< my thread ID
106
107 fr_log_t const *log; //!< log destination
108 fr_log_lvl_t lvl; //!< log level
109
110 fr_atomic_queue_t *aq_control; //!< atomic queue for control messages sent to me
111
112 fr_control_t *control; //!< the control plane
113
114 fr_event_list_t *el; //!< our event list
115
116 int num_channels; //!< actual number of channels
117
118 fr_heap_t *runnable; //!< current runnable requests which we've spent time processing
119
120 fr_timer_list_t *timeout; //!< Track when requests timeout using a dlist.
121 fr_timer_list_t *timeout_custom; //!< Track when requests timeout using an lst.
122 ///< requests must always be in one of these lists.
123 fr_time_delta_t max_request_time; //!< maximum time a request can be processed
124
125 fr_rb_tree_t *dedup; //!< de-dup tree
126
127 fr_rb_tree_t *listeners; //!< so we can cancel requests when a listener goes away
128
129 fr_io_stats_t stats; //!< input / output stats
130 fr_time_elapsed_t cpu_time; //!< histogram of total CPU time per request
131 fr_time_elapsed_t wall_clock; //!< histogram of wall clock time per request
132
133 uint64_t num_naks; //!< number of messages which were nak'd
134 uint64_t num_active; //!< number of active requests
135
136 fr_time_delta_t predicted; //!< How long we predict a request will take to execute.
137 fr_time_tracking_t tracking; //!< how much time the worker has spent doing things.
138
139 bool was_sleeping; //!< used to suppress multiple sleep signals in a row
140 bool exiting; //!< are we exiting?
141
142 fr_worker_channel_t *channel; //!< list of channels
143
144 request_slab_list_t *slab; //!< slab allocator for request_t
145};
146
147typedef struct {
148 fr_listen_t const *listener; //!< incoming packets
149
150 fr_rb_node_t node; //!< in tree of listeners
151
152 /*
153 * To save time, we don't care about num_elements here. Which means that we don't
154 * need to cache or lookup the fr_worker_listen_t when we free a request.
155 */
156 fr_dlist_head_t dlist; //!< of requests associated with this listener.
158
159
160static int8_t worker_listener_cmp(void const *one, void const *two)
161{
162 fr_worker_listen_t const *a = one, *b = two;
163
164 return CMP(a->listener, b->listener);
165}
166
167
168/*
169 * Explicitly cleanup the memory allocated to the ring buffer,
170 * just in case valgrind complains about it.
171 */
172static int _fr_worker_rb_free(void *arg)
173{
174 return talloc_free(arg);
175}
176
177/** Initialise thread local storage
178 *
179 * @return fr_ring_buffer_t for messages
180 */
182{
184
186 if (rb) return rb;
187
189 if (!rb) {
190 fr_perror("Failed allocating memory for worker ring buffer");
191 return NULL;
192 }
193
195
196 return rb;
197}
198
199static inline bool is_worker_thread(fr_worker_t const *worker)
200{
201 return (pthread_equal(pthread_self(), worker->thread_id) != 0);
202}
203
205static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now);
206
207/** Callback which handles a message being received on the worker side.
208 *
209 * @param[in] ctx the worker
210 * @param[in] ch the channel to drain
211 * @param[in] cd the message (if any) to start with
212 */
213static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
214{
215 fr_worker_t *worker = ctx;
216
217 worker->stats.in++;
218 DEBUG3("Received request %" PRIu64 "", worker->stats.in);
219 cd->channel.ch = ch;
220 worker_request_bootstrap(worker, cd, fr_time());
221}
222
224{
225 request_t *request;
226
227 while ((request = fr_dlist_pop_head(&ch->dlist)) != NULL) {
229 }
230}
231
232static void worker_exit(fr_worker_t *worker)
233{
234 worker->exiting = true;
235
236 /*
237 * Don't allow the post event to run
238 * any more requests. They'll be
239 * signalled to stop before we exit.
240 *
241 * This only has an effect in single
242 * threaded mode.
243 */
244 (void)fr_event_post_delete(worker->el, fr_worker_post_event, worker);
245}
246
247/** Handle a control plane message sent to the worker via a channel
248 *
249 * @param[in] ctx the worker
250 * @param[in] data the message
251 * @param[in] data_size size of the data
252 * @param[in] now the current time
253 */
254static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
255{
256 int i;
257 bool ok, was_sleeping;
258 fr_channel_t *ch;
261 fr_worker_t *worker = ctx;
262
263 was_sleeping = worker->was_sleeping;
264 worker->was_sleeping = false;
265
266 /*
267 * We were woken up by a signal to do something. We're
268 * not sleeping.
269 */
270 ce = fr_channel_service_message(now, &ch, data, data_size);
271 DEBUG3("Channel %s",
272 fr_table_str_by_value(channel_signals, ce, "<INVALID>"));
273 switch (ce) {
274 case FR_CHANNEL_ERROR:
275 return;
276
277 case FR_CHANNEL_EMPTY:
278 return;
279
280 case FR_CHANNEL_NOOP:
281 return;
282
284 fr_assert(0 == 1);
285 break;
286
288 fr_assert(ch != NULL);
289
290 if (!fr_channel_recv_request(ch)) {
291 worker->was_sleeping = was_sleeping;
292
293 } else while (fr_channel_recv_request(ch));
294 break;
295
296 case FR_CHANNEL_OPEN:
297 fr_assert(ch != NULL);
298
299 ok = false;
300 for (i = 0; i < worker->config.max_channels; i++) {
301 fr_assert(worker->channel[i].ch != ch);
302
303 if (worker->channel[i].ch != NULL) continue;
304
305 worker->channel[i].ch = ch;
306 fr_dlist_init(&worker->channel[i].dlist, fr_async_t, entry);
307
308 DEBUG3("Received channel %p into array entry %d", ch, i);
309
310 ms = fr_message_set_create(worker, worker->config.message_set_size,
311 sizeof(fr_channel_data_t),
312 worker->config.ring_buffer_size);
313 fr_assert(ms != NULL);
315
316 worker->num_channels++;
317 ok = true;
318 break;
319 }
320
321 fr_cond_assert(ok);
322 break;
323
324 case FR_CHANNEL_CLOSE:
325 fr_assert(ch != NULL);
326
327 ok = false;
328
329 /*
330 * Locate the signalling channel in the list
331 * of channels.
332 */
333 for (i = 0; i < worker->config.max_channels; i++) {
334 if (!worker->channel[i].ch) continue;
335
336 if (worker->channel[i].ch != ch) continue;
337
338 worker_requests_cancel(&worker->channel[i]);
339
341
343 "Network added messages to channel after sending FR_CHANNEL_CLOSE");
344
346 fr_assert(ms != NULL);
348 talloc_free(ms);
349
350 worker->channel[i].ch = NULL;
351
352 fr_assert(!fr_dlist_head(&worker->channel[i].dlist)); /* we can't look at num_elements */
353 fr_assert(worker->num_channels > 0);
354
355 worker->num_channels--;
356 ok = true;
357 break;
358 }
359
360 fr_cond_assert(ok);
361
362 /*
363 * Our last input channel closed,
364 * time to die.
365 */
366 if (worker->num_channels == 0) worker_exit(worker);
367 break;
368 }
369}
370
372{
374 request_t *request;
375
376 wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = li });
377 if (!wl) return -1;
378
379 while ((request = fr_dlist_pop_head(&wl->dlist)) != NULL) {
380 RDEBUG("Canceling request due to socket being closed");
382 }
383
384 (void) fr_rb_delete(worker->listeners, wl);
385 talloc_free(wl);
386
387 return 0;
388}
389
390
391/** A socket is going away, so clean up any requests which use this socket.
392 *
393 * @param[in] ctx the worker
394 * @param[in] data the message
395 * @param[in] data_size size of the data
396 * @param[in] now the current time
397 */
398static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
399{
400 fr_listen_t const *li;
401 fr_worker_t *worker = ctx;
402
403 fr_assert(data_size == sizeof(li));
404
405 memcpy(&li, data, sizeof(li));
406
407 (void) fr_worker_listen_cancel_self(worker, li);
408}
409
410/** Send a NAK to the network thread
411 *
412 * The network thread believes that a worker is running a request until that request has been NAK'd.
413 * We typically NAK requests when they've been hanging around in the worker's backlog too long,
414 * or there was an error executing the request.
415 *
416 * @param[in] worker the worker
417 * @param[in] cd the message to NAK
418 * @param[in] now when the message is NAKd
419 */
420static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
421{
422 size_t size;
423 fr_channel_data_t *reply;
424 fr_channel_t *ch;
426 fr_listen_t *listen;
427
428 worker->num_naks++;
429
430 /*
431 * Cache the outbound channel. We'll need it later.
432 */
433 ch = cd->channel.ch;
434 listen = cd->listen;
435
436 /*
437 * If the channel has been closed, but we haven't
438 * been informed, that is extremely bad.
439 *
440 * Try to continue working... but we'll likely
441 * leak memory or SEGV soon.
442 */
443 if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send NAK but channel has been closed")) {
444 fr_message_done(&cd->m);
445 return;
446 }
447
449 fr_assert(ms != NULL);
450
451 size = listen->app_io->default_reply_size;
452 if (!size) size = listen->app_io->default_message_size;
453
454 /*
455 * Allocate a default message size.
456 */
457 reply = (fr_channel_data_t *) fr_message_reserve(ms, size);
458 fr_assert(reply != NULL);
459
460 /*
461 * Encode a NAK
462 */
463 if (listen->app_io->nak) {
464 size = listen->app_io->nak(listen, cd->packet_ctx, cd->m.data,
465 cd->m.data_size, reply->m.data, reply->m.rb_size);
466 } else {
467 size = 1; /* rely on them to figure it the heck out */
468 }
469
470 (void) fr_message_alloc(ms, &reply->m, size);
471
472 /*
473 * Fill in the NAK.
474 */
475 reply->m.when = now;
476 reply->reply.cpu_time = worker->tracking.running_total;
477 reply->reply.processing_time = fr_time_delta_from_sec(10); /* @todo - set to something better? */
478 reply->reply.request_time = cd->request.recv_time;
479
480 reply->listen = cd->listen;
481 reply->packet_ctx = cd->packet_ctx;
482
483 /*
484 * Mark the original message as done.
485 */
486 fr_message_done(&cd->m);
487
488 /*
489 * Send the reply, which also polls the request queue.
490 */
491 if (fr_channel_send_reply(ch, reply) < 0) {
492 DEBUG2("Failed sending reply to channel");
493 }
494
495 worker->stats.out++;
496}
497
498/** Signal the unlang interpreter that it needs to stop running the request
499 *
500 * Signalling is a synchronous operation. Whatever I/O requests the request
501 * is currently performing are immediately cancelled, and all the frames are
502 * popped off the unlang stack.
503 *
504 * Modules and unlang keywords explicitly register signal handlers to deal
505 * with their yield points being cancelled/interrupted via this function.
506 *
507 * The caller should assume the request is no longer viable after calling
508 * this function.
509 *
510 * @param[in] request request to cancel. The request may still run to completion.
511 */
512static void worker_stop_request(request_t *request)
513{
514 /*
515 * Also marks the request as done and runs
516 * the internal/external callbacs.
517 */
519}
520
521/** Enforce max_request_time
522 *
523 * Run periodically, and tries to clean up requests which were received by the network
524 * thread more than max_request_time seconds ago. In the interest of not adding a
525 * timer for every packet, the requests are given a 1 second leeway.
526 *
527 * @param[in] tl the worker's timer list.
528 * @param[in] when the current time
529 * @param[in] uctx the request_t timing out.
530 */
532{
533 request_t *request = talloc_get_type_abort(uctx, request_t);
534
535 /*
536 * Waiting too long, delete it.
537 */
538 REDEBUG("Request has reached max_request_time - signalling it to stop");
539 worker_stop_request(request);
540
541 /*
542 * This ensures the finally section can run timeout specific policies
543 */
544 request->rcode = RLM_MODULE_TIMEOUT;
545}
546
547/** Set, or re-set the request timer
548 *
549 * Automatically moves requests between the timer lists (timeout, custom_timeout).
550 *
551 * Can be used to set the initial timeout, or extend the timeout of a request.
552 *
553 * @param[in] worker the worker containing the timeout lists.
554 * @param[in] request that we're timing out.
555 * @param[in] timeout the timeout to set.
556 * @return
557 * - 0 on success.
558 * - -1 on failure.
559 */
561{
562 fr_timer_list_t *tl = fr_time_delta_eq(worker->config.max_request_time, timeout) ? worker->timeout : worker->timeout_custom;
563
564 /* No need to disarm fr_timer_in does it for us */
565
566 if (unlikely(fr_timer_in(request, tl, &request->timeout, timeout,
567 true, _worker_request_timeout, request) < 0)) {
568 RERROR("Failed to create request timeout timer");
569 return -1;
570 }
571
572 return 0;
573}
574
575/** Start time tracking for a request, and mark it as runnable.
576 *
577 */
579{
580 /*
581 * New requests are inserted into the time order heap in
582 * strict time priority. Once they are in the list, they
583 * are only removed when the request is done / free'd.
584 */
585 fr_assert(!fr_timer_armed(request->timeout));
586
587 if (unlikely(fr_worker_request_timeout_set(worker, request, worker->config.max_request_time) < 0)) {
588 RERROR("Failed to set request timeout");
589 return -1;
590 }
591
592 /*
593 * Bootstrap the async state machine with the initial
594 * state of the request.
595 */
596 RDEBUG3("Time tracking started in yielded state");
597 fr_time_tracking_start(&worker->tracking, &request->async->tracking, now);
598 fr_time_tracking_yield(&request->async->tracking, now);
599 worker->num_active++;
600
601 fr_assert(!fr_heap_entry_inserted(request->runnable));
602 (void) fr_heap_insert(&worker->runnable, request);
603
604 return 0;
605}
606
608{
609 RDEBUG3("Time tracking ended");
610 fr_time_tracking_end(&worker->predicted, &request->async->tracking, now);
611 fr_assert(worker->num_active > 0);
612 worker->num_active--;
613
614 TALLOC_FREE(request->timeout); /* Disarm the reques timer */
615}
616
617/** Send a response packet to the network side
618 *
619 * @param[in] worker This worker.
620 * @param[in] request we're sending a reply for.
621 * @param[in] send_reply whether the network side sends a reply
622 * @param[in] now The current time
623 */
624static void worker_send_reply(fr_worker_t *worker, request_t *request, bool send_reply, fr_time_t now)
625{
626 fr_channel_data_t *reply;
627 fr_channel_t *ch;
629 size_t size = 1;
630
631 REQUEST_VERIFY(request);
632
633 /*
634 * If we're sending a reply, then it's no longer runnable.
635 */
636 fr_assert(!fr_heap_entry_inserted(request->runnable));
637
638 if (send_reply) {
639 size = request->async->listen->app_io->default_reply_size;
640 if (!size) size = request->async->listen->app_io->default_message_size;
641 }
642
643 /*
644 * Allocate and send the reply.
645 */
646 ch = request->async->channel;
647 fr_assert(ch != NULL);
648
649 /*
650 * If the channel has been closed, but we haven't
651 * been informed, that is extremely bad.
652 *
653 * Try to continue working... but we'll likely
654 * leak memory or SEGV soon.
655 */
656 if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send reply but channel has been closed")) {
657 return;
658 }
659
661 fr_assert(ms != NULL);
662
663 reply = (fr_channel_data_t *) fr_message_reserve(ms, size);
664 fr_assert(reply != NULL);
665
666 /*
667 * Encode it, if required.
668 */
669 if (send_reply) {
670 ssize_t slen = 0;
671 fr_listen_t const *listen = request->async->listen;
672
673 if (listen->app_io->encode) {
674 slen = listen->app_io->encode(listen->app_io_instance, request,
675 reply->m.data, reply->m.rb_size);
676 } else if (listen->app->encode) {
677 slen = listen->app->encode(listen->app_instance, request,
678 reply->m.data, reply->m.rb_size);
679 }
680 if (slen < 0) {
681 RPERROR("Failed encoding request");
682 *reply->m.data = 0;
683 slen = 1;
684 }
685
686 /*
687 * Shrink the buffer to the actual packet size.
688 *
689 * This will ALWAYS return the same message as we put in.
690 */
691 fr_assert((size_t) slen <= reply->m.rb_size);
692 (void) fr_message_alloc(ms, &reply->m, slen);
693 }
694
695 /*
696 * Fill in the rest of the fields in the channel message.
697 *
698 * sequence / ack will be filled in by fr_channel_send_reply()
699 */
700 reply->m.when = now;
701 reply->reply.cpu_time = worker->tracking.running_total;
702 reply->reply.processing_time = request->async->tracking.running_total;
703 reply->reply.request_time = request->async->recv_time;
704
705 reply->listen = request->async->listen;
706 reply->packet_ctx = request->async->packet_ctx;
707
708 /*
709 * Update the various timers.
710 */
711 fr_time_elapsed_update(&worker->cpu_time, now, fr_time_add(now, reply->reply.processing_time));
712 fr_time_elapsed_update(&worker->wall_clock, reply->reply.request_time, now);
713
714 RDEBUG("Finished request");
715
716 /*
717 * Send the reply, which also polls the request queue.
718 */
719 if (fr_channel_send_reply(ch, reply) < 0) {
720 /*
721 * Should only happen if the TO_REQUESTOR
722 * channel is full, or it's not yet active.
723 *
724 * Not much we can do except complain
725 * loudly and cleanup the request.
726 */
727 RPERROR("Failed sending reply to network thread");
728 }
729
730 worker->stats.out++;
731
732 fr_assert(!fr_timer_armed(request->timeout));
733 fr_assert(!fr_heap_entry_inserted(request->runnable));
734
735 fr_dlist_entry_unlink(&request->listen_entry);
736
737#ifndef NDEBUG
738 request->async->el = NULL;
739 request->async->channel = NULL;
740 request->async->packet_ctx = NULL;
741 request->async->listen = NULL;
742#endif
743}
744
745/*
746 * talloc_typed_asprintf() is horrifically slow for printing
747 * simple numbers.
748 */
749static char *itoa_internal(TALLOC_CTX *ctx, uint64_t number)
750{
751 char buffer[32];
752 char *p;
753 char const *numbers = "0123456789";
754
755 p = buffer + 30;
756 *(p--) = '\0';
757
758 while (number > 0) {
759 *(p--) = numbers[number % 10];
760 number /= 10;
761 }
762
763 if (p[1]) return talloc_strdup(ctx, p + 1);
764
765 return talloc_strdup(ctx, "0");
766}
767
768/** Initialize various request fields needed by the worker.
769 *
770 */
771static inline CC_HINT(always_inline)
773{
774 /*
775 * For internal requests request->packet
776 * and request->reply are already populated.
777 */
778 if (!request->packet) MEM(request->packet = fr_packet_alloc(request, false));
779 if (!request->reply) MEM(request->reply = fr_packet_alloc(request, false));
780
781 request->packet->timestamp = now;
782 request->async = talloc_zero(request, fr_async_t);
783 request->async->recv_time = now;
784 request->async->el = worker->el;
785 fr_dlist_entry_init(&request->async->entry);
786}
787
788static inline CC_HINT(always_inline)
790{
792 if (request->name) talloc_const_free(request->name);
793 request->name = itoa_internal(request, request->number);
794}
795
796static inline CC_HINT(always_inline)
798{
799 return fr_timer_list_num_events(worker->timeout) + fr_timer_list_num_events(worker->timeout_custom);
800}
801
802static int _worker_request_deinit(request_t *request, UNUSED void *uctx)
803{
804 return request_slab_deinit(request);
805}
806
808{
809 int ret = -1;
810 request_t *request;
811 TALLOC_CTX *ctx;
812 fr_listen_t *listen = cd->listen;
813
814 if (worker_num_requests(worker) >= (uint32_t) worker->config.max_requests) {
815 RATE_LIMIT_GLOBAL(ERROR, "Worker at max requests");
816 goto nak;
817 }
818
819 /*
820 * Receive a message to the worker queue, and decode it
821 * to a request.
822 */
823 fr_assert(listen != NULL);
824
825 ctx = request = request_slab_reserve(worker->slab);
826 if (!request) {
827 RATE_LIMIT_GLOBAL(ERROR, "Worker failed allocating new request");
828 goto nak;
829 }
830 /*
831 * Ensures that both the deinit function runs AND
832 * the request is returned to the slab if something
833 * calls talloc_free() on it.
834 */
835 request_slab_element_set_destructor(request, _worker_request_deinit, worker);
836
837 /*
838 * Have to initialise the request manually because namspace
839 * changes based on the listener that allocated it.
840 */
841 if (request_init(request, REQUEST_TYPE_EXTERNAL, (&(request_init_args_t){ .namespace = listen->dict })) < 0) {
842 request_slab_release(request);
843 goto nak;
844 }
845
846 /*
847 * Do normal worker init that's shared between internal
848 * and external requests.
849 */
850 worker_request_init(worker, request, now);
852
853 /*
854 * Associate our interpreter with the request
855 */
856 unlang_interpret_set(request, worker->intp);
857
858 request->packet->timestamp = cd->request.recv_time; /* Legacy - Remove once everything looks at request->async */
859
860 /*
861 * Update the transport-specific fields.
862 */
863 request->async->channel = cd->channel.ch;
864
865 request->async->recv_time = cd->request.recv_time;
866
867 request->async->listen = listen;
868 request->async->packet_ctx = cd->packet_ctx;
869 request->async->priority = cd->priority;
870
871 /*
872 * Now that the "request" structure has been initialized, go decode the packet.
873 *
874 * Note that this also sets the "async process" function.
875 */
876 if (listen->app->decode) {
877 ret = listen->app->decode(listen->app_instance, request, cd->m.data, cd->m.data_size);
878 } else if (listen->app_io->decode) {
879 ret = listen->app_io->decode(listen->app_io_instance, request, cd->m.data, cd->m.data_size);
880 }
881
882 if (ret < 0) {
883 talloc_free(ctx);
884nak:
885 worker_nak(worker, cd, now);
886 return;
887 }
888
889 /*
890 * Set the entry point for this virtual server.
891 */
892 if (unlang_call_push(NULL, request, cd->listen->server_cs, UNLANG_TOP_FRAME) < 0) {
893 RERROR("Protocol failed to set 'process' function");
894 worker_nak(worker, cd, now);
895 return;
896 }
897
898 /*
899 * We're done with this message.
900 */
901 fr_message_done(&cd->m);
902
903 /*
904 * Look for conflicting / duplicate packets, but only if
905 * requested to do so.
906 */
907 if (request->async->listen->track_duplicates) {
908 request_t *old;
909
910 old = fr_rb_find(worker->dedup, request);
911 if (!old) {
912 goto insert_new;
913 }
914
915 fr_assert(old->async->listen == request->async->listen);
916 fr_assert(old->async->channel == request->async->channel);
917
918 /*
919 * There's a new packet. Do we keep the old one,
920 * or the new one? This decision is made by
921 * checking the recv_time, which is a
922 * nanosecond-resolution timer. If the time is
923 * identical, then the new packet is the same as
924 * the old one.
925 *
926 * If the new packet is a duplicate of the old
927 * one, then we can just discard the new one. We
928 * have to tell the channel that we've "eaten"
929 * this reply, so the sequence number should
930 * increase.
931 *
932 * @todo - fix the channel code to do queue
933 * depth, and not sequence / ack.
934 */
935 if (fr_time_eq(old->async->recv_time, request->async->recv_time)) {
936 RWARN("Discarding duplicate of request (%"PRIu64")", old->number);
937
938 fr_channel_null_reply(request->async->channel);
939 request_slab_release(request);
940
941 /*
942 * Signal there's a dup, and ignore the
943 * return code. We don't bother replying
944 * here, as an FD event or timer will
945 * wake up the request, and cause it to
946 * continue.
947 *
948 * @todo - the old request is NOT
949 * running, but is yielded. It MAY clean
950 * itself up, or do something...
951 */
953 worker->stats.dup++;
954 return;
955 }
956
957 /*
958 * Stop the old request, and decrement the number
959 * of active requests.
960 */
961 RWARN("Got conflicting packet for request (%" PRIu64 "), telling old request to stop", old->number);
962
964 worker->stats.dropped++;
965
966 insert_new:
967 (void) fr_rb_insert(worker->dedup, request);
968 }
969
970 worker_request_time_tracking_start(worker, request, now);
971
972 {
974
975 wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = listen });
976 if (!wl) {
977 MEM(wl = talloc_zero(worker, fr_worker_listen_t));
978 fr_dlist_init(&wl->dlist, request_t, listen_entry);
979 wl->listener = listen;
980
981 (void) fr_rb_insert(worker->listeners, wl);
982 }
983
984 fr_dlist_insert_tail(&wl->dlist, request);
985 }
986}
987
988/**
989 * Track a request_t in the "runnable" heap.
990 * Higher priorities take precedence, followed by lower sequence numbers
991 */
992static int8_t worker_runnable_cmp(void const *one, void const *two)
993{
994 request_t const *a = one, *b = two;
995 int ret;
996
997 ret = CMP(b->async->priority, a->async->priority);
998 if (ret != 0) return ret;
999
1000 ret = CMP(a->async->sequence, b->async->sequence);
1001 if (ret != 0) return ret;
1002
1003 return fr_time_cmp(a->async->recv_time, b->async->recv_time);
1004}
1005
1006/**
1007 * Track a request_t in the "dedup" tree
1008 */
1009static int8_t worker_dedup_cmp(void const *one, void const *two)
1010{
1011 int ret;
1012 request_t const *a = one, *b = two;
1013
1014 ret = CMP(a->async->listen, b->async->listen);
1015 if (ret) return ret;
1016
1017 return CMP(a->async->packet_ctx, b->async->packet_ctx);
1018}
1019
1020/** Destroy a worker
1021 *
1022 * The input channels are signaled, and local messages are cleaned up.
1023 *
1024 * This should be called to _EXPLICITLY_ destroy a worker, when some fatal
1025 * error has occurred on the worker side, and we need to destroy it.
1026 *
1027 * We signal all pending requests in the backlog to stop, and tell the
1028 * network side that it should not send us any more requests.
1029 *
1030 * @param[in] worker the worker to destroy.
1031 */
1033{
1034 int i, count, ret;
1035
1036// WORKER_VERIFY;
1037
1038 /*
1039 * Stop any new requests running with this interpreter
1040 */
1042
1043 /*
1044 * Destroy all of the active requests. These are ones
1045 * which are still waiting for timers or file descriptor
1046 * events.
1047 */
1048 count = 0;
1049
1050 /*
1051 * Force the timeout event to fire for all requests that
1052 * are still running.
1053 */
1054 ret = fr_timer_list_force_run(worker->timeout);
1055 if (unlikely(ret < 0)) {
1056 fr_assert_msg(0, "Failed to force run the timeout list");
1057 } else {
1058 count += ret;
1059 }
1060
1062 if (unlikely(ret < 0)) {
1063 fr_assert_msg(0, "Failed to force run the custom timeout list");
1064 } else {
1065 count += ret;
1066 }
1068
1069 DEBUG("Worker is exiting - stopped %u requests", count);
1070
1071 /*
1072 * Signal the channels that we're closing.
1073 *
1074 * The other end owns the channel, and will take care of
1075 * popping messages in the TO_RESPONDER queue, and marking
1076 * them FR_MESSAGE_DONE. It will ignore the messages in
1077 * the TO_REQUESTOR queue, as we own those. They will be
1078 * automatically freed when our talloc context is freed.
1079 */
1080 for (i = 0; i < worker->config.max_channels; i++) {
1081 if (!worker->channel[i].ch) continue;
1082
1083 worker_requests_cancel(&worker->channel[i]);
1084
1086 "Pending messages in channel after cancelling request");
1087
1089 }
1090
1091 talloc_free(worker);
1092}
1093
1094/** Internal request (i.e. one generated by the interpreter) is now complete
1095 *
1096 */
1097static void _worker_request_internal_init(request_t *request, void *uctx)
1098{
1099 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1100 fr_time_t now = fr_time();
1101
1102 worker_request_init(worker, request, now);
1103
1104 /*
1105 * Requests generated by the interpreter
1106 * are always marked up as internal.
1107 */
1109 worker_request_time_tracking_start(worker, request, now);
1110}
1111
1112
1113/** External request is now complete
1114 *
1115 */
1116static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1117{
1118 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1119 fr_time_t now = fr_time();
1120
1121 /*
1122 * All external requests MUST have a listener.
1123 */
1125 fr_assert(request->async->listen != NULL);
1126
1127 /*
1128 * Only real packets are in the dedup tree. And even
1129 * then, only some of the time.
1130 */
1131 if (request->async->listen->track_duplicates) {
1132 (void) fr_rb_delete(worker->dedup, request);
1133 }
1134
1135 /*
1136 * If we're running a real request, then the final
1137 * indentation MUST be zero. Otherwise we skipped
1138 * something!
1139 *
1140 * Also check that the request is NOT marked as
1141 * "yielded", but is in fact done.
1142 *
1143 * @todo - check that the stack is at frame 0, otherwise
1144 * more things have gone wrong.
1145 */
1146 fr_assert_msg(request_is_internal(request) || request_is_detached(request) || (request->log.indent.unlang == 0),
1147 "Request %s bad log indentation - expected 0 got %u", request->name, request->log.indent.unlang);
1149 "Request %s is marked as yielded at end of processing", request->name);
1151 "Request %s stack depth %u > 0", request->name, unlang_interpret_stack_depth(request));
1152 RDEBUG("Done request");
1153
1154 /*
1155 * The request is done. Track that.
1156 */
1157 worker_request_time_tracking_end(worker, request, now);
1158
1159 /*
1160 * Remove it from the list of requests associated with this channel.
1161 */
1162 if (fr_dlist_entry_in_list(&request->async->entry)) {
1163 fr_dlist_entry_unlink(&request->async->entry);
1164 }
1165
1166 /*
1167 * These conditions are true when the server is
1168 * exiting and we're stopping all the requests.
1169 *
1170 * This should never happen otherwise.
1171 */
1172 if (unlikely(!fr_channel_active(request->async->channel))) {
1173 request_slab_release(request);
1174 return;
1175 }
1176
1177 worker_send_reply(worker, request, !unlang_request_is_cancelled(request), now);
1178 request_slab_release(request);
1179}
1180
1181/** Internal request (i.e. one generated by the interpreter) is now complete
1182 *
1183 * Whatever generated the request is now responsible for freeing it.
1184 */
1185static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1186{
1187 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1188
1189 worker_request_time_tracking_end(worker, request, fr_time());
1190
1191 fr_assert(!fr_heap_entry_inserted(request->runnable));
1192 fr_assert(!fr_timer_armed(request->timeout));
1193 fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1194}
1195
1196/** Detached request (i.e. one generated by the interpreter with no parent) is now complete
1197 *
1198 * As the request has no parent, then there's nothing to free it
1199 * so we have to.
1200 */
1201static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
1202{
1203 /*
1204 * No time tracking for detached requests
1205 * so we don't need to call
1206 * worker_request_time_tracking_end.
1207 */
1208 fr_assert(!fr_heap_entry_inserted(request->runnable));
1209
1210 /*
1211 * Normally worker_request_time_tracking_end
1212 * would remove the request from the time
1213 * order heap, but we need to do that for
1214 * detached requests.
1215 */
1216 TALLOC_FREE(request->timeout);
1217
1218 fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1219
1220 /*
1221 * Detached requests have to be freed by us
1222 * as nothing else can free them.
1223 *
1224 * All other requests must be freed by the
1225 * code which allocated them.
1226 */
1227 talloc_free(request);
1228}
1229
1230
1231/** Make us responsible for running the request
1232 *
1233 */
1234static void _worker_request_detach(request_t *request, void *uctx)
1235{
1236 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1237
1238 RDEBUG4("%s - Request detaching", __FUNCTION__);
1239
1240 if (request_is_detachable(request)) {
1241 /*
1242 * End the time tracking... We don't track detached requests,
1243 * because they don't contribute for the time consumed by an
1244 * external request.
1245 */
1246 if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) {
1247 RDEBUG3("Forcing time tracking to running state, from yielded, for request detach");
1248 fr_time_tracking_resume(&request->async->tracking, fr_time());
1249 }
1250 worker_request_time_tracking_end(worker, request, fr_time());
1251
1252 if (request_detach(request) < 0) RPEDEBUG("Failed detaching request");
1253
1254 RDEBUG3("Request is detached");
1255 } else {
1256 fr_assert_msg(0, "Request is not detachable");
1257 }
1258
1259 return;
1260}
1261
1262/** Request is now runnable
1263 *
1264 */
1265static void _worker_request_runnable(request_t *request, void *uctx)
1266{
1267 fr_worker_t *worker = uctx;
1268
1269 RDEBUG4("%s - Request marked as runnable", __FUNCTION__);
1270 fr_heap_insert(&worker->runnable, request);
1271}
1272
1273/** Interpreter yielded request
1274 *
1275 */
1276static void _worker_request_yield(request_t *request, UNUSED void *uctx)
1277{
1278 RDEBUG4("%s - Request yielded", __FUNCTION__);
1279 if (likely(!request_is_detached(request))) fr_time_tracking_yield(&request->async->tracking, fr_time());
1280}
1281
1282/** Interpreter is starting to work on request again
1283 *
1284 */
1285static void _worker_request_resume(request_t *request, UNUSED void *uctx)
1286{
1287 RDEBUG4("%s - Request resuming", __FUNCTION__);
1288 if (likely(!request_is_detached(request))) fr_time_tracking_resume(&request->async->tracking, fr_time());
1289}
1290
1291/** Check if a request is scheduled
1292 *
1293 */
1294static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
1295{
1296 return fr_heap_entry_inserted(request->runnable);
1297}
1298
1299/** Update a request's priority
1300 *
1301 */
1302static void _worker_request_prioritise(request_t *request, void *uctx)
1303{
1304 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1305
1306 RDEBUG4("%s - Request priority changed", __FUNCTION__);
1307
1308 /* Extract the request from the runnable queue _if_ it's in the runnable queue */
1309 if (fr_heap_extract(&worker->runnable, request) < 0) return;
1310
1311 /* Reinsert it to re-evaluate its new priority */
1312 fr_heap_insert(&worker->runnable, request);
1313}
1314
1315/** Run a request
1316 *
1317 * Until it either yields, or is done.
1318 *
1319 * This function is also responsible for sending replies, and
1320 * cleaning up the request.
1321 *
1322 * @param[in] worker the worker
1323 * @param[in] start the current time
1324 */
1325static inline CC_HINT(always_inline) void worker_run_request(fr_worker_t *worker, fr_time_t start)
1326{
1327 request_t *request;
1328 fr_time_t now;
1329
1331
1332 now = start;
1333
1334 /*
1335 * Busy-loop running requests for 1ms. We still poll the
1336 * event loop 1000 times a second, OR when there's no
1337 * more work to do. This allows us to make progress with
1338 * ongoing requests, at the expense of sometimes ignoring
1339 * new ones.
1340 */
1341 while (fr_time_delta_lt(fr_time_sub(now, start), fr_time_delta_from_msec(1)) &&
1342 ((request = fr_heap_pop(&worker->runnable)) != NULL)) {
1343
1344 REQUEST_VERIFY(request);
1345 fr_assert(!fr_heap_entry_inserted(request->runnable));
1346
1347 /*
1348 * For real requests, if the channel is gone,
1349 * just stop the request and free it.
1350 */
1351 if (request->async->channel && !fr_channel_active(request->async->channel)) {
1352 worker_stop_request(request);
1353 return;
1354 }
1355
1357
1358 now = fr_time();
1359 }
1360}
1361
1362/** Create a worker
1363 *
1364 * @param[in] ctx the talloc context
1365 * @param[in] name the name of this worker
1366 * @param[in] el the event list
1367 * @param[in] logger the destination for all logging messages
1368 * @param[in] lvl log level
1369 * @param[in] config various configuration parameters
1370 * @return
1371 * - NULL on error
1372 * - fr_worker_t on success
1373 */
1374fr_worker_t *fr_worker_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl,
1376{
1377 fr_worker_t *worker;
1378
1379 worker = talloc_zero(ctx, fr_worker_t);
1380 if (!worker) {
1381nomem:
1382 fr_strerror_const("Failed allocating memory");
1383 return NULL;
1384 }
1385
1386 worker->name = talloc_strdup(worker, name); /* thread locality */
1387
1389
1390 if (config) worker->config = *config;
1391
1392#define CHECK_CONFIG(_x, _min, _max) do { \
1393 if (!worker->config._x) worker->config._x = _min; \
1394 if (worker->config._x < _min) worker->config._x = _min; \
1395 if (worker->config._x > _max) worker->config._x = _max; \
1396 } while (0)
1397
1398#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max) do { \
1399 if (fr_time_delta_lt(worker->config._x, _min)) worker->config._x = _min; \
1400 if (fr_time_delta_gt(worker->config._x, _max)) worker->config._x = _max; \
1401 } while (0)
1402
1403 CHECK_CONFIG(max_requests,1024,(1 << 30));
1404 CHECK_CONFIG(max_channels, 64, 1024);
1405 CHECK_CONFIG(reuse.child_pool_size, 4096, 65536);
1406 CHECK_CONFIG(message_set_size, 1024, 8192);
1407 CHECK_CONFIG(ring_buffer_size, (1 << 17), (1 << 20));
1409
1410 worker->channel = talloc_zero_array(worker, fr_worker_channel_t, worker->config.max_channels);
1411 if (!worker->channel) {
1412 talloc_free(worker);
1413 goto nomem;
1414 }
1415
1416 worker->thread_id = pthread_self();
1417 worker->el = el;
1418 worker->log = logger;
1419 worker->lvl = lvl;
1420
1421 /*
1422 * The worker thread starts now. Manually initialize it,
1423 * because we're tracking request time, not the time that
1424 * the worker thread is running.
1425 */
1426 memset(&worker->tracking, 0, sizeof(worker->tracking));
1427
1428 worker->aq_control = fr_atomic_queue_alloc(worker, 1024);
1429 if (!worker->aq_control) {
1430 fr_strerror_const("Failed creating atomic queue");
1431 fail:
1432 talloc_free(worker);
1433 return NULL;
1434 }
1435
1436 worker->control = fr_control_create(worker, el, worker->aq_control);
1437 if (!worker->control) {
1438 fr_strerror_const_push("Failed creating control plane");
1439 goto fail;
1440 }
1441
1443 fr_strerror_const_push("Failed adding control channel");
1444 goto fail;
1445 }
1446
1448 fr_strerror_const_push("Failed adding callback for listeners");
1449 goto fail;
1450 }
1451
1452 worker->runnable = fr_heap_talloc_alloc(worker, worker_runnable_cmp, request_t, runnable, 0);
1453 if (!worker->runnable) {
1454 fr_strerror_const("Failed creating runnable heap");
1455 goto fail;
1456 }
1457
1458 worker->timeout = fr_timer_list_ordered_alloc(worker, el->tl);
1459 if (!worker->timeout) {
1460 fr_strerror_const("Failed creating timeouts list");
1461 goto fail;
1462 }
1463
1464 worker->timeout_custom = fr_timer_list_lst_alloc(worker, el->tl);
1465 if (!worker->timeout_custom) {
1466 fr_strerror_const("Failed creating custom timeouts list");
1467 goto fail;
1468 }
1469
1470 worker->dedup = fr_rb_inline_talloc_alloc(worker, request_t, dedup_node, worker_dedup_cmp, NULL);
1471 if (!worker->dedup) {
1472 fr_strerror_const("Failed creating de_dup tree");
1473 goto fail;
1474 }
1475
1477 if (!worker->listeners) {
1478 fr_strerror_const("Failed creating listener tree");
1479 goto fail;
1480 }
1481
1482 worker->intp = unlang_interpret_init(worker, el,
1484 .init_internal = _worker_request_internal_init,
1485
1486 .done_external = _worker_request_done_external,
1487 .done_internal = _worker_request_done_internal,
1488 .done_detached = _worker_request_done_detached,
1489
1490 .detach = _worker_request_detach,
1491 .yield = _worker_request_yield,
1492 .resume = _worker_request_resume,
1493 .mark_runnable = _worker_request_runnable,
1494
1495 .scheduled = _worker_request_scheduled,
1496 .prioritise = _worker_request_prioritise
1497 },
1498 worker);
1499 if (!worker->intp){
1500 fr_strerror_const("Failed initialising interpreter");
1501 goto fail;
1502 }
1503
1504 {
1507
1508 if (!(worker->slab = request_slab_list_alloc(worker, el, &worker->config.reuse, NULL, NULL,
1509 UNCONST(void *, worker), true, false))) {
1510 fr_strerror_const("Failed creating request slab list");
1511 goto fail;
1512 }
1513 }
1514
1516
1517 return worker;
1518}
1519
1520
1521/** The main loop and entry point of the stand-alone worker thread.
1522 *
1523 * Where there is only one thread, the event loop runs fr_worker_pre_event() and fr_worker_post_event()
1524 * instead, And then fr_worker_post_event() takes care of calling worker_run_request() to actually run the
1525 * request.
1526 *
1527 * @param[in] worker the worker data structure to manage
1528 */
1530{
1532
1533 while (true) {
1534 bool wait_for_event;
1535 int num_events;
1536
1538
1539 /*
1540 * There are runnable requests. We still service
1541 * the event loop, but we don't wait for events.
1542 */
1543 wait_for_event = (fr_heap_num_elements(worker->runnable) == 0);
1544 if (wait_for_event) {
1545 if (worker->exiting && (worker_num_requests(worker) == 0)) break;
1546
1547 DEBUG4("Ready to process requests");
1548 }
1549
1550 /*
1551 * Check the event list. If there's an error
1552 * (e.g. exit), we stop looping and clean up.
1553 */
1554 DEBUG4("Gathering events - %s", wait_for_event ? "will wait" : "Will not wait");
1555 num_events = fr_event_corral(worker->el, fr_time(), wait_for_event);
1556 if (num_events < 0) {
1557 if (fr_event_loop_exiting(worker->el)) {
1558 DEBUG4("Event loop exiting");
1559 break;
1560 }
1561
1562 PERROR("Failed retrieving events");
1563 break;
1564 }
1565
1566 DEBUG4("%u event(s) pending", num_events);
1567
1568 /*
1569 * Service outstanding events.
1570 */
1571 if (num_events > 0) {
1572 DEBUG4("Servicing event(s)");
1573 fr_event_service(worker->el);
1574 }
1575
1576 /*
1577 * Run any outstanding requests.
1578 */
1579 worker_run_request(worker, fr_time());
1580 }
1581}
1582
1583/** Pre-event handler
1584 *
1585 * This should be run ONLY in single-threaded mode!
1586 */
1588{
1589 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1590 request_t *request;
1591
1592 request = fr_heap_peek(worker->runnable);
1593 if (!request) return 0;
1594
1595 /*
1596 * There's work to do. Tell the event handler to poll
1597 * for IO / timers, but also immediately return to the
1598 * calling function, which has more work to do.
1599 */
1600 return 1;
1601}
1602
1603
1604/** Post-event handler
1605 *
1606 * This should be run ONLY in single-threaded mode!
1607 */
1609{
1610 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1611
1612 worker_run_request(worker, fr_time()); /* Event loop time can be too old, and trigger asserts */
1613}
1614
1615/** Print debug information about the worker structure
1616 *
1617 * @param[in] worker the worker
1618 * @param[in] fp the file where the debug output is printed.
1619 */
1620void fr_worker_debug(fr_worker_t *worker, FILE *fp)
1621{
1623
1624 fprintf(fp, "\tnum_channels = %d\n", worker->num_channels);
1625 fprintf(fp, "\tstats.in = %" PRIu64 "\n", worker->stats.in);
1626
1627 fprintf(fp, "\tcalculated (predicted) total CPU time = %" PRIu64 "\n",
1628 fr_time_delta_unwrap(worker->predicted) * worker->stats.in);
1629 fprintf(fp, "\tcalculated (counted) per request time = %" PRIu64 "\n",
1631
1632 fr_time_tracking_debug(&worker->tracking, fp);
1633
1634}
1635
1636/** Create a channel to the worker
1637 *
1638 * Called by the master (i.e. network) thread when it needs to create
1639 * a new channel to a particuler worker.
1640 *
1641 * @param[in] worker the worker
1642 * @param[in] master the control plane of the master
1643 * @param[in] ctx the context in which the channel will be created
1644 */
1646{
1647 fr_channel_t *ch;
1648 pthread_t id;
1649 bool same;
1650
1652
1653 id = pthread_self();
1654 same = (pthread_equal(id, worker->thread_id) != 0);
1655
1656 ch = fr_channel_create(ctx, master, worker->control, same);
1657 if (!ch) return NULL;
1658
1660
1661 /*
1662 * Tell the worker about the channel
1663 */
1664 if (fr_channel_signal_open(ch) < 0) {
1665 talloc_free(ch);
1666 return NULL;
1667 }
1668
1669 return ch;
1670}
1671
1673{
1675
1676 /*
1677 * Skip a bunch of work if we're already in the worker thread.
1678 */
1679 if (is_worker_thread(worker)) {
1680 return fr_worker_listen_cancel_self(worker, li);
1681 }
1682
1684 if (!rb) return -1;
1685
1686 return fr_control_message_send(worker->control, rb, FR_CONTROL_ID_LISTEN, &li, sizeof(li));
1687}
1688
1689#ifdef WITH_VERIFY_PTR
1690/** Verify the worker data structures.
1691 *
1692 * @param[in] worker the worker
1693 */
1694static void worker_verify(fr_worker_t *worker)
1695{
1696 int i;
1697
1698 (void) talloc_get_type_abort(worker, fr_worker_t);
1699 fr_atomic_queue_verify(worker->aq_control);
1700
1701 fr_assert(worker->control != NULL);
1702 (void) talloc_get_type_abort(worker->control, fr_control_t);
1703
1704 fr_assert(worker->el != NULL);
1705 (void) talloc_get_type_abort(worker->el, fr_event_list_t);
1706
1707 fr_assert(worker->runnable != NULL);
1708 (void) talloc_get_type_abort(worker->runnable, fr_heap_t);
1709
1710 fr_assert(worker->dedup != NULL);
1711 (void) talloc_get_type_abort(worker->dedup, fr_rb_tree_t);
1712
1713 for (i = 0; i < worker->config.max_channels; i++) {
1714 if (!worker->channel[i].ch) continue;
1715
1716 (void) talloc_get_type_abort(worker->channel[i].ch, fr_channel_t);
1717 }
1718}
1719#endif
1720
1721int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
1722{
1723 if (num < 0) return -1;
1724 if (num == 0) return 0;
1725
1726 stats[0] = worker->stats.in;
1727 if (num >= 2) stats[1] = worker->stats.out;
1728 if (num >= 3) stats[2] = worker->stats.dup;
1729 if (num >= 4) stats[3] = worker->stats.dropped;
1730 if (num >= 5) stats[4] = worker->num_naks;
1731 if (num >= 6) stats[5] = worker->num_active;
1732
1733 if (num <= 6) return num;
1734
1735 return 6;
1736}
1737
1738static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
1739{
1740 fr_worker_t const *worker = ctx;
1741 fr_time_delta_t when;
1742
1743 if ((info->argc == 0) || (strcmp(info->argv[0], "count") == 0)) {
1744 fprintf(fp, "count.in\t\t\t%" PRIu64 "\n", worker->stats.in);
1745 fprintf(fp, "count.out\t\t\t%" PRIu64 "\n", worker->stats.out);
1746 fprintf(fp, "count.dup\t\t\t%" PRIu64 "\n", worker->stats.dup);
1747 fprintf(fp, "count.dropped\t\t\t%" PRIu64 "\n", worker->stats.dropped);
1748 fprintf(fp, "count.naks\t\t\t%" PRIu64 "\n", worker->num_naks);
1749 fprintf(fp, "count.active\t\t\t%" PRIu64 "\n", worker->num_active);
1750 fprintf(fp, "count.runnable\t\t\t%u\n", fr_heap_num_elements(worker->runnable));
1751 }
1752
1753 if ((info->argc == 0) || (strcmp(info->argv[0], "cpu") == 0)) {
1754 when = worker->predicted;
1755 fprintf(fp, "cpu.request_time_rtt\t\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1756
1757 when = worker->tracking.running_total;
1758 if (fr_time_delta_ispos(when)) when = fr_time_delta_div(when, fr_time_delta_wrap(worker->stats.in - worker->stats.dropped));
1759 fprintf(fp, "cpu.average_request_time\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1760
1761 when = worker->tracking.running_total;
1762 fprintf(fp, "cpu.used\t\t\t%.6f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1763
1764 when = worker->tracking.waiting_total;
1765 fprintf(fp, "cpu.waiting\t\t\t%.3f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1766
1767 fr_time_elapsed_fprint(fp, &worker->cpu_time, "cpu.requests", 4);
1768 fr_time_elapsed_fprint(fp, &worker->wall_clock, "time.requests", 4);
1769 }
1770
1771 return 0;
1772}
1773
1775 {
1776 .parent = "stats",
1777 .name = "worker",
1778 .help = "Statistics for workers threads.",
1779 .read_only = true
1780 },
1781
1782 {
1783 .parent = "stats worker",
1784 .add_name = true,
1785 .name = "self",
1786 .syntax = "[(count|cpu)]",
1787 .func = cmd_stats_worker,
1788 .help = "Show statistics for a specific worker thread.",
1789 .read_only = true
1790 },
1791
1793};
static int const char char buffer[256]
Definition acutest.h:576
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
Definition app_io.h:55
size_t default_reply_size
same for replies
Definition app_io.h:40
size_t default_message_size
Usually maximum message size.
Definition app_io.h:39
fr_io_nak_t nak
Function to send a NAK.
Definition app_io.h:62
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
Definition app_io.h:54
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
Definition application.h:80
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
Definition application.h:85
#define fr_atexit_thread_local(_name, _free, _uctx)
Definition atexit.h:221
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Structure to hold the atomic queue.
#define atomic_uint64_t
#define UNCONST(_type, _ptr)
Remove const qualification from a pointer.
Definition build.h:167
#define RCSID(id)
Definition build.h:485
#define NDEBUG_UNUSED
Definition build.h:328
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:112
#define unlikely(_x)
Definition build.h:383
#define UNUSED
Definition build.h:317
unlang_action_t unlang_call_push(unlang_result_t *p_result, request_t *request, CONF_SECTION *server_cs, bool top_frame)
Push a call frame onto the stack.
Definition call.c:147
fr_table_num_sorted_t const channel_signals[]
Definition channel.c:153
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
Definition channel.c:183
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition channel.c:685
int fr_channel_set_recv_request(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_request)
Definition channel.c:938
void * fr_channel_responder_uctx_get(fr_channel_t *ch)
Get responder-specific data from a channel.
Definition channel.c:897
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
Definition channel.c:472
int fr_channel_null_reply(fr_channel_t *ch)
Don't send a reply message into the channel.
Definition channel.c:624
void fr_channel_responder_uctx_add(fr_channel_t *ch, void *uctx)
Add responder-specific data to a channel.
Definition channel.c:885
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
Definition channel.c:511
bool fr_channel_active(fr_channel_t *ch)
Check if a channel is active.
Definition channel.c:812
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
Definition channel.c:854
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
Definition channel.c:952
A full channel, which consists of two ends.
Definition channel.c:144
fr_message_t m
the message header
Definition channel.h:105
fr_channel_event_t
Definition channel.h:67
@ FR_CHANNEL_NOOP
Definition channel.h:74
@ FR_CHANNEL_EMPTY
Definition channel.h:75
@ FR_CHANNEL_CLOSE
Definition channel.h:72
@ FR_CHANNEL_ERROR
Definition channel.h:68
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition channel.h:70
@ FR_CHANNEL_OPEN
Definition channel.h:71
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition channel.h:69
void * packet_ctx
Packet specific context for holding client information, and other proto_* specific information that n...
Definition channel.h:142
fr_listen_t * listen
for tracking packet transport, etc.
Definition channel.h:146
uint32_t priority
Priority of this packet.
Definition channel.h:140
Channel information which is added to a message.
Definition channel.h:104
int argc
current argument count
Definition command.h:39
char const * parent
e.g. "show module"
Definition command.h:52
#define CMD_TABLE_END
Definition command.h:62
char const ** argv
text version of commands
Definition command.h:42
#define FR_CONTROL_ID_LISTEN_DEAD
Definition control.h:61
#define FR_CONTROL_ID_CHANNEL
Definition control.h:56
#define FR_CONTROL_ID_LISTEN
Definition control.h:57
#define FR_CONTROL_MAX_SIZE
Definition control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition control.h:50
static fr_ring_buffer_t * rb
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:139
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:210
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:156
#define MEM(x)
Definition debug.h:36
#define ERROR(fmt,...)
Definition dhcpclient.c:41
#define DEBUG(fmt,...)
Definition dhcpclient.c:39
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:260
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition dlist.h:486
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition dlist.h:163
static void fr_dlist_entry_unlink(fr_dlist_t *entry)
Remove an item from the dlist when we don't have access to the head.
Definition dlist.h:146
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:939
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition dlist.h:672
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:378
static void fr_dlist_entry_init(fr_dlist_t *entry)
Initialise a linked list without metadata.
Definition dlist.h:138
Head of a doubly linked list.
Definition dlist.h:51
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
Definition heap.c:322
int fr_heap_extract(fr_heap_t **hp, void *data)
Remove a node from the heap.
Definition heap.c:239
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition heap.h:136
static bool fr_heap_entry_inserted(fr_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
Definition heap.h:124
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition heap.h:115
The main heap structure.
Definition heap.h:66
rlm_rcode_t unlang_interpret(request_t *request, bool running)
Run the interpreter for a current request.
Definition interpret.c:941
void unlang_interpret_set(request_t *request, unlang_interpret_t *intp)
Set a specific interpreter for a request.
Definition interpret.c:1994
int unlang_interpret_stack_depth(request_t *request)
Return the depth of the request's stack.
Definition interpret.c:1529
void unlang_interpret_set_thread_default(unlang_interpret_t *intp)
Set the default interpreter for this thread.
Definition interpret.c:2025
unlang_interpret_t * unlang_interpret_init(TALLOC_CTX *ctx, fr_event_list_t *el, unlang_request_func_t *funcs, void *uctx)
Initialize a unlang compiler / interpret.
Definition interpret.c:1953
bool unlang_request_is_cancelled(request_t const *request)
Return whether a request has been cancelled.
Definition interpret.c:1579
void unlang_interpret_signal(request_t *request, fr_signal_t action)
Send a signal (usually stop) to a request.
Definition interpret.c:1396
bool unlang_interpret_is_resumable(request_t *request)
Check if a request as resumable.
Definition interpret.c:1598
#define UNLANG_REQUEST_RESUME
Definition interpret.h:43
#define UNLANG_TOP_FRAME
Definition interpret.h:36
External functions provided by the owner of the interpret.
Definition interpret.h:110
uint64_t out
Definition base.h:43
uint64_t dup
Definition base.h:44
uint64_t dropped
Definition base.h:45
uint64_t in
Definition base.h:42
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition control.c:417
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:343
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition control.c:149
The control structure.
Definition control.c:79
void const * app_instance
Definition listen.h:39
fr_app_t const * app
Definition listen.h:38
void const * app_io_instance
I/O path configuration context.
Definition listen.h:33
CONF_SECTION * server_cs
CONF_SECTION of the server.
Definition listen.h:41
fr_dict_t const * dict
dictionary for this listener
Definition listen.h:30
fr_app_io_t const * app_io
I/O path functions.
Definition listen.h:32
Minimal data structure to use the new code.
Definition listen.h:59
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define RDEBUG3(fmt,...)
Definition log.h:343
#define RWARN(fmt,...)
Definition log.h:297
#define RERROR(fmt,...)
Definition log.h:298
#define DEBUG4(_fmt,...)
Definition log.h:267
#define RPERROR(fmt,...)
Definition log.h:302
#define RPEDEBUG(fmt,...)
Definition log.h:376
#define RDEBUG4(fmt,...)
Definition log.h:344
#define RATE_LIMIT_GLOBAL(_log, _fmt,...)
Rate limit messages using a global limiting entry.
Definition log.h:641
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition event.c:2194
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition event.c:2059
talloc_free(reap)
int fr_event_post_delete(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Delete a post-event callback from the event list.
Definition event.c:2030
bool fr_event_loop_exiting(fr_event_list_t *el)
Check to see whether the event loop is in the process of exiting.
Definition event.c:2383
Stores all information relating to an event list.
Definition event.c:377
fr_log_lvl_t
Definition log.h:67
fr_packet_t * fr_packet_alloc(TALLOC_CTX *ctx, bool new_vector)
Allocate a new fr_packet_t.
Definition packet.c:38
unsigned int uint32_t
long int ssize_t
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:190
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:988
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition message.c:1238
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition message.c:934
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
Definition message.c:127
A Message set, composed of message headers and ring buffer data.
Definition message.c:95
size_t rb_size
cache-aligned size in the ring buffer
Definition message.h:51
fr_time_t when
when this message was sent
Definition message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
static const conf_parser_t config[]
Definition base.c:186
#define fr_assert(_expr)
Definition rad_assert.h:38
#define REDEBUG(fmt,...)
Definition radclient.h:52
#define RDEBUG(fmt,...)
Definition radclient.h:53
#define DEBUG2(fmt,...)
Definition radclient.h:43
static void send_reply(int sockfd, fr_channel_data_t *reply)
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
bool fr_rb_delete(fr_rb_tree_t *tree, void const *data)
Remove node and free data (if a free function was specified)
Definition rb.c:741
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
Definition rb.h:246
The main red black tree structure.
Definition rb.h:73
rlm_rcode_t
Return codes indicating the result of the module call.
Definition rcode.h:40
@ RLM_MODULE_TIMEOUT
Module (or section) timed out.
Definition rcode.h:50
int request_slab_deinit(request_t *request)
Callback for slabs to deinitialise the request.
Definition request.c:383
int request_detach(request_t *child)
Unlink a subrequest from its parent.
Definition request.c:542
#define REQUEST_VERIFY(_x)
Definition request.h:305
#define request_is_detached(_x)
Definition request.h:186
#define request_is_external(_x)
Definition request.h:184
#define REQUEST_POOL_HEADERS
Definition request.h:66
#define request_is_internal(_x)
Definition request.h:185
@ REQUEST_TYPE_EXTERNAL
A request received on the wire.
Definition request.h:178
#define request_is_detachable(_x)
Definition request.h:187
#define request_init(_ctx, _type, _args)
Definition request.h:317
#define REQUEST_POOL_SIZE
Definition request.h:79
Optional arguments for initialising requests.
Definition request.h:283
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:64
static char const * name
@ FR_SIGNAL_DUP
A duplicate request was received.
Definition signal.h:44
@ FR_SIGNAL_CANCEL
Request has been cancelled.
Definition signal.h:40
#define FR_SLAB_FUNCS(_name, _type)
Define type specific wrapper functions for slabs and slab elements.
Definition slab.h:120
#define FR_SLAB_TYPES(_name, _type)
Define type specific wrapper structs for slabs and slab elements.
Definition slab.h:72
unsigned int num_children
How many child allocations are expected off each element.
Definition slab.h:48
size_t child_pool_size
Size of pool space to be allocated to each element.
Definition slab.h:49
return count
Definition module.c:155
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
@ memory_order_seq_cst
Definition stdatomic.h:132
#define atomic_fetch_add_explicit(object, operand, order)
Definition stdatomic.h:302
Definition log.h:96
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition talloc.h:229
void fr_time_elapsed_update(fr_time_elapsed_t *elapsed, fr_time_t start, fr_time_t end)
Definition time.c:547
void fr_time_elapsed_fprint(FILE *fp, fr_time_elapsed_t const *elapsed, char const *prefix, int tab_offset)
Definition time.c:592
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition time.h:575
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
Definition time.h:154
#define fr_time_delta_lt(_a, _b)
Definition time.h:285
static fr_time_delta_t fr_time_delta_from_sec(int64_t sec)
Definition time.h:590
#define fr_time_delta_wrap(_time)
Definition time.h:152
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_eq(_a, _b)
Definition time.h:241
#define fr_time_delta_eq(_a, _b)
Definition time.h:287
#define NSEC
Definition time.h:379
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
static fr_time_delta_t fr_time_delta_div(fr_time_delta_t a, fr_time_delta_t b)
Definition time.h:267
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition time.h:916
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
@ FR_TIME_TRACKING_YIELDED
We're currently tracking time in the yielded state.
static void fr_time_tracking_yield(fr_time_tracking_t *tt, fr_time_t now)
Transition to the yielded state, recording the time we just spent running.
static void fr_time_tracking_end(fr_time_delta_t *predicted, fr_time_tracking_t *tt, fr_time_t now)
End time tracking for this entity.
fr_time_delta_t waiting_total
total time spent waiting
fr_time_delta_t running_total
total time spent running
static void fr_time_tracking_start(fr_time_tracking_t *parent, fr_time_tracking_t *tt, fr_time_t now)
Start time tracking for a tracked entity.
static void fr_time_tracking_resume(fr_time_tracking_t *tt, fr_time_t now)
Track that a request resumed.
static void fr_time_tracking_debug(fr_time_tracking_t *tt, FILE *fp)
Print debug information about the time tracking structure.
uint64_t fr_timer_list_num_events(fr_timer_list_t *tl)
Return number of pending events.
Definition timer.c:1149
fr_timer_list_t * fr_timer_list_ordered_alloc(TALLOC_CTX *ctx, fr_timer_list_t *parent)
Allocate a new sorted event timer list.
Definition timer.c:1287
int fr_timer_list_force_run(fr_timer_list_t *tl)
Forcibly run all events in an event loop.
Definition timer.c:920
fr_timer_list_t * fr_timer_list_lst_alloc(TALLOC_CTX *ctx, fr_timer_list_t *parent)
Allocate a new lst based timer list.
Definition timer.c:1261
An event timer list.
Definition timer.c:50
#define fr_timer_in(...)
Definition timer.h:87
static bool fr_timer_armed(fr_timer_t *ev)
Definition timer.h:120
static fr_event_list_t * el
int unlang_thread_instantiate(TALLOC_CTX *ctx)
Create thread-specific data structures for unlang.
Definition compile.c:5230
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
Definition strerror.c:732
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1288
fr_heap_t * runnable
current runnable requests which we've spent time processing
Definition worker.c:118
static void worker_request_time_tracking_end(fr_worker_t *worker, request_t *request, fr_time_t now)
Definition worker.c:607
static void _worker_request_yield(request_t *request, UNUSED void *uctx)
Interpreter yielded request.
Definition worker.c:1276
static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
Handle a control plane message sent to the worker via a channel.
Definition worker.c:254
fr_event_list_t * el
our event list
Definition worker.c:114
int fr_worker_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
Pre-event handler.
Definition worker.c:1587
static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now)
Send a response packet to the network side.
Definition worker.c:624
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
Definition worker.c:1645
fr_rb_tree_t * listeners
so we can cancel requests when a listener goes away
Definition worker.c:127
fr_channel_t * ch
Definition worker.c:87
static void worker_run_request(fr_worker_t *worker, fr_time_t start)
Run a request.
Definition worker.c:1325
static void worker_exit(fr_worker_t *worker)
Definition worker.c:232
#define WORKER_VERIFY
Definition worker.c:75
bool was_sleeping
used to suppress multiple sleep signals in a row
Definition worker.c:139
static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
Definition worker.c:1738
static void _worker_request_runnable(request_t *request, void *uctx)
Request is now runnable.
Definition worker.c:1265
static char * itoa_internal(TALLOC_CTX *ctx, uint64_t number)
Definition worker.c:749
fr_worker_t * fr_worker_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config)
Create a worker.
Definition worker.c:1374
static int8_t worker_dedup_cmp(void const *one, void const *two)
Track a request_t in the "dedup" tree.
Definition worker.c:1009
fr_worker_channel_t * channel
list of channels
Definition worker.c:142
char const * name
name of this worker
Definition worker.c:100
uint64_t num_active
number of active requests
Definition worker.c:134
fr_cmd_table_t cmd_worker_table[]
Definition worker.c:1774
static _Thread_local fr_ring_buffer_t * fr_worker_rb
Definition worker.c:84
static int worker_request_time_tracking_start(fr_worker_t *worker, request_t *request, fr_time_t now)
Start time tracking for a request, and mark it as runnable.
Definition worker.c:578
int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
Definition worker.c:1721
static int _worker_request_deinit(request_t *request, UNUSED void *uctx)
Definition worker.c:802
fr_dlist_head_t dlist
Definition worker.c:93
static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
Detached request (i.e.
Definition worker.c:1201
static void _worker_request_resume(request_t *request, UNUSED void *uctx)
Interpreter is starting to work on request again.
Definition worker.c:1285
fr_rb_tree_t * dedup
de-dup tree
Definition worker.c:125
fr_atomic_queue_t * aq_control
atomic queue for control messages sent to me
Definition worker.c:110
static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Send a NAK to the network thread.
Definition worker.c:420
static void worker_request_name_number(request_t *request)
Definition worker.c:789
static void _worker_request_timeout(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t when, void *uctx)
Enforce max_request_time.
Definition worker.c:531
static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Definition worker.c:807
fr_log_t const * log
log destination
Definition worker.c:107
fr_io_stats_t stats
input / output stats
Definition worker.c:129
#define CHECK_CONFIG(_x, _min, _max)
static void _worker_request_detach(request_t *request, void *uctx)
Make us responsible for running the request.
Definition worker.c:1234
static int _fr_worker_rb_free(void *arg)
Definition worker.c:172
fr_time_tracking_t tracking
how much time the worker has spent doing things.
Definition worker.c:137
static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
External request is now complete.
Definition worker.c:1116
void fr_worker_destroy(fr_worker_t *worker)
Destroy a worker.
Definition worker.c:1032
uint64_t num_naks
number of messages which were nak'd
Definition worker.c:133
static void worker_request_init(fr_worker_t *worker, request_t *request, fr_time_t now)
Initialize various request fields needed by the worker.
Definition worker.c:772
fr_worker_config_t config
external configuration
Definition worker.c:101
fr_listen_t const * listener
incoming packets
Definition worker.c:148
unlang_interpret_t * intp
Worker's local interpreter.
Definition worker.c:103
static int fr_worker_listen_cancel_self(fr_worker_t *worker, fr_listen_t const *li)
Definition worker.c:371
static void worker_stop_request(request_t *request)
Signal the unlang interpreter that it needs to stop running the request.
Definition worker.c:512
static void _worker_request_prioritise(request_t *request, void *uctx)
Update a request's priority.
Definition worker.c:1302
bool exiting
are we exiting?
Definition worker.c:140
fr_log_lvl_t lvl
log level
Definition worker.c:108
static void worker_requests_cancel(fr_worker_channel_t *ch)
Definition worker.c:223
int num_channels
actual number of channels
Definition worker.c:116
fr_time_delta_t max_request_time
maximum time a request can be processed
Definition worker.c:123
static atomic_uint64_t request_number
Definition worker.c:79
fr_time_elapsed_t cpu_time
histogram of total CPU time per request
Definition worker.c:130
fr_rb_node_t node
in tree of listeners
Definition worker.c:150
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
Definition worker.c:1672
int fr_worker_request_timeout_set(fr_worker_t *worker, request_t *request, fr_time_delta_t timeout)
Set, or re-set the request timer.
Definition worker.c:560
void fr_worker_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Post-event handler.
Definition worker.c:1608
fr_dlist_head_t dlist
of requests associated with this listener.
Definition worker.c:156
void fr_worker(fr_worker_t *worker)
The main loop and entry point of the stand-alone worker thread.
Definition worker.c:1529
request_slab_list_t * slab
slab allocator for request_t
Definition worker.c:144
static uint32_t worker_num_requests(fr_worker_t *worker)
Definition worker.c:797
fr_time_delta_t predicted
How long we predict a request will take to execute.
Definition worker.c:136
pthread_t thread_id
my thread ID
Definition worker.c:105
static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Callback which handles a message being received on the worker side.
Definition worker.c:213
fr_time_elapsed_t wall_clock
histogram of wall clock time per request
Definition worker.c:131
static int8_t worker_listener_cmp(void const *one, void const *two)
Definition worker.c:160
static bool is_worker_thread(fr_worker_t const *worker)
Definition worker.c:199
fr_timer_list_t * timeout
Track when requests timeout using a dlist.
Definition worker.c:120
static fr_ring_buffer_t * fr_worker_rb_init(void)
Initialise thread local storage.
Definition worker.c:181
fr_control_t * control
the control plane
Definition worker.c:112
static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
Check if a request is scheduled.
Definition worker.c:1294
static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Internal request (i.e.
Definition worker.c:1185
fr_timer_list_t * timeout_custom
Track when requests timeout using an lst.
Definition worker.c:121
void fr_worker_debug(fr_worker_t *worker, FILE *fp)
Print debug information about the worker structure.
Definition worker.c:1620
static void _worker_request_internal_init(request_t *request, void *uctx)
Internal request (i.e.
Definition worker.c:1097
#define CACHE_LINE_SIZE
Definition worker.c:78
#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max)
static int8_t worker_runnable_cmp(void const *one, void const *two)
Track a request_t in the "runnable" heap.
Definition worker.c:992
static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
A socket is going away, so clean up any requests which use this socket.
Definition worker.c:398
A worker which takes packets from a master, and processes them.
Definition worker.c:99
int message_set_size
default start number of messages
Definition worker.h:64
int max_requests
max requests this worker will handle
Definition worker.h:60
int max_channels
maximum number of channels
Definition worker.h:62
fr_slab_config_t reuse
slab allocator configuration
Definition worker.h:69
int ring_buffer_size
default start size for the ring buffers
Definition worker.h:65
fr_time_delta_t max_request_time
maximum time a request can be processed
Definition worker.h:67