The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
worker.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 /**
18 * $Id: 0d238284ca7ad12ac35e3f4868b61030a19b9d67 $
19 *
20 * @brief Worker thread functions.
21 * @file io/worker.c
22 *
23 * The "worker" thread is the one responsible for the bulk of the
24 * work done when processing a request. Workers are spawned by the
25 * scheduler, and create a kqueue (KQ) and control-plane
26 * Atomic Queue (AQ) for control-plane communication.
27 *
28 * When a network thread discovers that it needs more workers, it
29 * asks the scheduler for a KQ/AQ combination. The network thread
30 * then creates a channel dedicated to that worker, and sends the
31 * channel to the worker in a "new channel" message. The worker
32 * receives the channel, and sends an ACK back to the network thread.
33 *
34 * The network thread then sends the worker new packets, which the
35 * worker receives and processes.
36 *
37 * When a packet is decoded, it is put into the "runnable" heap, and
38 * also into the timeout sublist. The main loop fr_worker() then
39 * pulls new requests off of this heap and runs them. The main event
40 * loop checks the head of the timeout sublist, and forcefully terminates
41 * any requests which have been running for too long.
42 *
43 * If a request is yielded, it is placed onto the yielded list in
44 * the worker "tracking" data structure.
45 *
46 * @copyright 2016 Alan DeKok (aland@freeradius.org)
47 */
48
49RCSID("$Id: 0d238284ca7ad12ac35e3f4868b61030a19b9d67 $")
50
51#define LOG_PREFIX worker->name
52#define LOG_DST worker->log
53
54#include <freeradius-devel/io/channel.h>
55#include <freeradius-devel/io/listen.h>
56#include <freeradius-devel/io/worker.h>
57#include <freeradius-devel/unlang/base.h>
58#include <freeradius-devel/util/minmax_heap.h>
59#include <freeradius-devel/util/timer.h>
60
61#include <stdalign.h>
62
63#ifdef WITH_VERIFY_PTR
64static void worker_verify(fr_worker_t *worker);
65#define WORKER_VERIFY worker_verify(worker)
66#else
67#define WORKER_VERIFY
68#endif
69
70static _Atomic(uint64_t) request_number = 0;
71
74
75static _Thread_local fr_ring_buffer_t *fr_worker_rb;
76
77typedef struct {
78 fr_channel_t *ch;
79
80 /*
81 * To save time, we don't care about num_elements here. Which means that we don't
82 * need to cache or lookup the fr_worker_listen_t when we free a request.
83 */
84 fr_dlist_head_t dlist;
86
87/**
88 * A worker which takes packets from a master, and processes them.
89 */
91 char const *name; //!< name of this worker
92 fr_worker_config_t config; //!< external configuration
93
94 unlang_interpret_t *intp; //!< Worker's local interpreter.
95
96 pthread_t thread_id; //!< my thread ID
97
98 fr_log_t const *log; //!< log destination
99 fr_log_lvl_t lvl; //!< log level
100
101 fr_atomic_queue_t *aq_control; //!< atomic queue for control messages sent to me
102
103 fr_control_t *control; //!< the control plane
104
105 fr_event_list_t *el; //!< our event list
106
107 int num_channels; //!< actual number of channels
108
109 fr_heap_t *runnable; //!< current runnable requests which we've spent time processing
110
111 fr_timer_list_t *timeout; //!< Track when requests timeout using a dlist.
112 fr_time_delta_t max_request_time; //!< maximum time a request can be processed
113
114 fr_rb_tree_t *dedup; //!< de-dup tree
115
116 fr_rb_tree_t *listeners; //!< so we can cancel requests when a listener goes away
117
118 fr_io_stats_t stats; //!< input / output stats
119 fr_time_elapsed_t cpu_time; //!< histogram of total CPU time per request
120 fr_time_elapsed_t wall_clock; //!< histogram of wall clock time per request
121
122 uint64_t num_naks; //!< number of messages which were nak'd
123 uint64_t num_active; //!< number of active requests
124
125 fr_time_delta_t predicted; //!< How long we predict a request will take to execute.
126 fr_time_tracking_t tracking; //!< how much time the worker has spent doing things.
127
128 bool was_sleeping; //!< used to suppress multiple sleep signals in a row
129 bool exiting; //!< are we exiting?
130
131 fr_worker_channel_t *channel; //!< list of channels
132
133 request_slab_list_t *slab; //!< slab allocator for request_t
134};
135
136typedef struct {
137 fr_listen_t const *listener; //!< incoming packets
138
139 fr_rb_node_t node; //!< in tree of listeners
140
141 /*
142 * To save time, we don't care about num_elements here. Which means that we don't
143 * need to cache or lookup the fr_worker_listen_t when we free a request.
144 */
145 fr_dlist_head_t dlist; //!< of requests associated with this listener.
147
148
149static int8_t worker_listener_cmp(void const *one, void const *two)
150{
151 fr_worker_listen_t const *a = one, *b = two;
152
153 return CMP(a->listener, b->listener);
154}
155
156
157/*
158 * Explicitly cleanup the memory allocated to the ring buffer,
159 * just in case valgrind complains about it.
160 */
161static int _fr_worker_rb_free(void *arg)
162{
163 return talloc_free(arg);
164}
165
166/** Initialise thread local storage
167 *
168 * @return fr_ring_buffer_t for messages
169 */
171{
173
174 rb = fr_worker_rb;
175 if (rb) return rb;
176
178 if (!rb) {
179 fr_perror("Failed allocating memory for worker ring buffer");
180 return NULL;
181 }
182
184
185 return rb;
186}
187
188static inline bool is_worker_thread(fr_worker_t const *worker)
189{
190 return (pthread_equal(pthread_self(), worker->thread_id) != 0);
191}
192
194static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now);
195
196/** Callback which handles a message being received on the worker side.
197 *
198 * @param[in] ctx the worker
199 * @param[in] ch the channel to drain
200 * @param[in] cd the message (if any) to start with
201 */
202static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
203{
204 fr_worker_t *worker = ctx;
205
206 worker->stats.in++;
207 DEBUG3("Received request %" PRIu64 "", worker->stats.in);
208 cd->channel.ch = ch;
209 worker_request_bootstrap(worker, cd, fr_time());
210}
211
213{
214 request_t *request;
215
216 while ((request = fr_dlist_pop_head(&ch->dlist)) != NULL) {
218 }
219}
220
221static void worker_exit(fr_worker_t *worker)
222{
223 worker->exiting = true;
224
225 /*
226 * Don't allow the post event to run
227 * any more requests. They'll be
228 * signalled to stop before we exit.
229 *
230 * This only has an effect in single
231 * threaded mode.
232 */
233 (void)fr_event_post_delete(worker->el, fr_worker_post_event, worker);
234}
235
236/** Handle a control plane message sent to the worker via a channel
237 *
238 * @param[in] ctx the worker
239 * @param[in] data the message
240 * @param[in] data_size size of the data
241 * @param[in] now the current time
242 */
243static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
244{
245 int i;
246 bool ok, was_sleeping;
247 fr_channel_t *ch;
250 fr_worker_t *worker = ctx;
251
252 was_sleeping = worker->was_sleeping;
253 worker->was_sleeping = false;
254
255 /*
256 * We were woken up by a signal to do something. We're
257 * not sleeping.
258 */
259 ce = fr_channel_service_message(now, &ch, data, data_size);
260 DEBUG3("Channel %s",
261 fr_table_str_by_value(channel_signals, ce, "<INVALID>"));
262 switch (ce) {
263 case FR_CHANNEL_ERROR:
264 return;
265
266 case FR_CHANNEL_EMPTY:
267 return;
268
269 case FR_CHANNEL_NOOP:
270 return;
271
273 fr_assert(0 == 1);
274 break;
275
277 fr_assert(ch != NULL);
278
279 if (!fr_channel_recv_request(ch)) {
280 worker->was_sleeping = was_sleeping;
281
282 } else while (fr_channel_recv_request(ch));
283 break;
284
285 case FR_CHANNEL_OPEN:
286 fr_assert(ch != NULL);
287
288 ok = false;
289 for (i = 0; i < worker->config.max_channels; i++) {
290 fr_assert(worker->channel[i].ch != ch);
291
292 if (worker->channel[i].ch != NULL) continue;
293
294 worker->channel[i].ch = ch;
295 fr_dlist_init(&worker->channel[i].dlist, fr_async_t, entry);
296
297 DEBUG3("Received channel %p into array entry %d", ch, i);
298
299 ms = fr_message_set_create(worker, worker->config.message_set_size,
300 sizeof(fr_channel_data_t),
301 worker->config.ring_buffer_size, false);
302 fr_assert(ms != NULL);
304
305 worker->num_channels++;
306 ok = true;
307 break;
308 }
309
310 fr_cond_assert(ok);
311 break;
312
313 case FR_CHANNEL_CLOSE:
314 fr_assert(ch != NULL);
315
316 ok = false;
317
318 /*
319 * Locate the signalling channel in the list
320 * of channels.
321 */
322 for (i = 0; i < worker->config.max_channels; i++) {
323 if (!worker->channel[i].ch) continue;
324
325 if (worker->channel[i].ch != ch) continue;
326
327 worker_requests_cancel(&worker->channel[i]);
328
330
331 fr_assert_msg(fr_dlist_num_elements(&worker->channel[i].dlist) == 0,
332 "Network added messages to channel after sending FR_CHANNEL_CLOSE");
333
335 fr_assert(ms != NULL);
337 talloc_free(ms);
338
339 worker->channel[i].ch = NULL;
340
341 fr_assert(!fr_dlist_head(&worker->channel[i].dlist)); /* we can't look at num_elements */
342 fr_assert(worker->num_channels > 0);
343
344 worker->num_channels--;
345 ok = true;
346 break;
347 }
348
349 fr_cond_assert(ok);
350
351 /*
352 * Our last input channel closed,
353 * time to die.
354 */
355 if (worker->num_channels == 0) worker_exit(worker);
356 break;
357 }
358}
359
361{
363 request_t *request;
364
365 wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = li });
366 if (!wl) return -1;
367
368 while ((request = fr_dlist_pop_head(&wl->dlist)) != NULL) {
369 RDEBUG("Canceling request due to socket being closed");
371 }
372
373 (void) fr_rb_delete(worker->listeners, wl);
374 talloc_free(wl);
375
376 return 0;
377}
378
379
380/** A socket is going away, so clean up any requests which use this socket.
381 *
382 * @param[in] ctx the worker
383 * @param[in] data the message
384 * @param[in] data_size size of the data
385 * @param[in] now the current time
386 */
387static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
388{
389 fr_listen_t const *li;
390 fr_worker_t *worker = ctx;
391
392 fr_assert(data_size == sizeof(li));
393
394 memcpy(&li, data, sizeof(li));
395
396 (void) fr_worker_listen_cancel_self(worker, li);
397}
398
399/** Send a NAK to the network thread
400 *
401 * The network thread believes that a worker is running a request until that request has been NAK'd.
402 * We typically NAK requests when they've been hanging around in the worker's backlog too long,
403 * or there was an error executing the request.
404 *
405 * @param[in] worker the worker
406 * @param[in] cd the message to NAK
407 * @param[in] now when the message is NAKd
408 */
409static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
410{
411 size_t size;
412 fr_channel_data_t *reply;
413 fr_channel_t *ch;
415 fr_listen_t *listen;
416
417 worker->num_naks++;
418
419 /*
420 * Cache the outbound channel. We'll need it later.
421 */
422 ch = cd->channel.ch;
423 listen = cd->listen;
424
425 /*
426 * If the channel has been closed, but we haven't
427 * been informed, that is extremely bad.
428 *
429 * Try to continue working... but we'll likely
430 * leak memory or SEGV soon.
431 */
432 if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send NAK but channel has been closed")) {
433 fr_message_done(&cd->m);
434 return;
435 }
436
438 fr_assert(ms != NULL);
439
440 size = listen->app_io->default_reply_size;
441 if (!size) size = listen->app_io->default_message_size;
442
443 /*
444 * Allocate a default message size.
445 */
446 MEM(reply = (fr_channel_data_t *) fr_message_reserve(ms, size));
447
448 /*
449 * Encode a NAK
450 */
451 if (listen->app_io->nak) {
452 size = listen->app_io->nak(listen, cd->packet_ctx, cd->m.data,
453 cd->m.data_size, reply->m.data, reply->m.rb_size);
454 } else {
455 size = 1; /* rely on them to figure it the heck out */
456 }
457
458 (void) fr_message_alloc(ms, &reply->m, size);
459
460 /*
461 * Fill in the NAK.
462 */
463 reply->m.when = now;
464 reply->reply.cpu_time = worker->tracking.running_total;
465 reply->reply.processing_time = fr_time_delta_from_msec(1); /* @todo - set to something better? */
466 reply->reply.request_time = cd->request.recv_time;
467
468 reply->listen = cd->listen;
469 reply->packet_ctx = cd->packet_ctx;
470
471 /*
472 * Mark the original message as done.
473 */
474 fr_message_done(&cd->m);
475
476 /*
477 * Send the reply, which also polls the request queue.
478 */
479 if (fr_channel_send_reply(ch, reply) < 0) {
480 DEBUG2("Failed sending reply to channel");
481 }
482
483 worker->stats.out++;
484}
485
486/** Signal the unlang interpreter that it needs to stop running the request
487 *
488 * Signalling is a synchronous operation. Whatever I/O requests the request
489 * is currently performing are immediately cancelled, and all the frames are
490 * popped off the unlang stack.
491 *
492 * Modules and unlang keywords explicitly register signal handlers to deal
493 * with their yield points being cancelled/interrupted via this function.
494 *
495 * The caller should assume the request is no longer viable after calling
496 * this function.
497 *
498 * @param[in] request request to cancel. The request may still run to completion.
499 */
500static void worker_stop_request(request_t *request)
501{
502 /*
503 * Also marks the request as done and runs
504 * the internal/external callbacs.
505 */
507}
508
509/** Enforce max_request_time
510 *
511 * Run periodically, and tries to clean up requests which were received by the network
512 * thread more than max_request_time seconds ago. In the interest of not adding a
513 * timer for every packet, the requests are given a 1 second leeway.
514 *
515 * @param[in] tl the worker's timer list.
516 * @param[in] when the current time
517 * @param[in] uctx the request_t timing out.
518 */
520{
521 request_t *request = talloc_get_type_abort(uctx, request_t);
522
523 /*
524 * Waiting too long, delete it.
525 */
526 REDEBUG("Request has reached max_request_time - signalling it to stop");
527 worker_stop_request(request);
528
529 /*
530 * This ensures the finally section can run timeout specific policies
531 */
532 request->rcode = RLM_MODULE_TIMEOUT;
533}
534
535
536/** Start time tracking for a request, and mark it as runnable.
537 *
538 */
540{
541 /*
542 * New requests are inserted into the time order heap in
543 * strict time priority. Once they are in the list, they
544 * are only removed when the request is done / free'd.
545 */
546 fr_assert(!fr_timer_armed(request->timeout));
547
548 if (unlikely(fr_timer_in(request, worker->timeout, &request->timeout, worker->config.max_request_time,
549 true, _worker_request_timeout, request) < 0)) {
550 RERROR("Failed to set request timeout timer");
551 return -1;
552 }
553
554 /*
555 * Bootstrap the async state machine with the initial
556 * state of the request.
557 */
558 RDEBUG3("Time tracking started in yielded state");
559 fr_time_tracking_start(&worker->tracking, &request->async->tracking, now);
560 fr_time_tracking_yield(&request->async->tracking, now);
561 worker->num_active++;
562
563 fr_assert(!fr_heap_entry_inserted(request->runnable));
564 (void) fr_heap_insert(&worker->runnable, request);
565
566 return 0;
567}
568
570{
571 RDEBUG3("Time tracking ended");
572 fr_time_tracking_end(&worker->predicted, &request->async->tracking, now);
573 fr_assert(worker->num_active > 0);
574 worker->num_active--;
575
576 TALLOC_FREE(request->timeout); /* Disarm the reques timer */
577}
578
579/** Send a response packet to the network side
580 *
581 * @param[in] worker This worker.
582 * @param[in] request we're sending a reply for.
583 * @param[in] send_reply whether the network side sends a reply
584 * @param[in] now The current time
585 */
586static void worker_send_reply(fr_worker_t *worker, request_t *request, bool send_reply, fr_time_t now)
587{
588 fr_channel_data_t *reply;
589 fr_channel_t *ch;
591 size_t size = 1;
592
593 REQUEST_VERIFY(request);
594
595 /*
596 * If we're sending a reply, then it's no longer runnable.
597 */
598 fr_assert(!fr_heap_entry_inserted(request->runnable));
599
600 if (send_reply) {
601 size = request->async->listen->app_io->default_reply_size;
602 if (!size) size = request->async->listen->app_io->default_message_size;
603 }
604
605 /*
606 * Allocate and send the reply.
607 */
608 ch = request->async->channel;
609 fr_assert(ch != NULL);
610
611 /*
612 * If the channel has been closed, but we haven't
613 * been informed, that is extremely bad.
614 *
615 * Try to continue working... but we'll likely
616 * leak memory or SEGV soon.
617 */
618 if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send reply but channel has been closed")) {
619 return;
620 }
621
623 fr_assert(ms != NULL);
624
625 reply = (fr_channel_data_t *) fr_message_reserve(ms, size);
626 fr_assert(reply != NULL);
627
628 /*
629 * Encode it, if required.
630 */
631 if (send_reply) {
632 ssize_t slen = 0;
633 fr_listen_t const *listen = request->async->listen;
634
635 if (listen->app_io->encode) {
636 slen = listen->app_io->encode(listen->app_io_instance, request,
637 reply->m.data, reply->m.rb_size);
638 } else if (listen->app->encode) {
639 slen = listen->app->encode(listen->app_instance, request,
640 reply->m.data, reply->m.rb_size);
641 }
642 if (slen < 0) {
643 RPERROR("Failed encoding request");
644 *reply->m.data = 0;
645 slen = 1;
646 }
647
648 /*
649 * Shrink the buffer to the actual packet size.
650 *
651 * This will ALWAYS return the same message as we put in.
652 */
653 fr_assert((size_t) slen <= reply->m.rb_size);
654 (void) fr_message_alloc(ms, &reply->m, slen);
655 }
656
657 /*
658 * Fill in the rest of the fields in the channel message.
659 *
660 * sequence / ack will be filled in by fr_channel_send_reply()
661 */
662 reply->m.when = now;
663 reply->reply.cpu_time = worker->tracking.running_total;
664 reply->reply.processing_time = request->async->tracking.running_total;
665 reply->reply.request_time = request->async->recv_time;
666
667 reply->listen = request->async->listen;
668 reply->packet_ctx = request->async->packet_ctx;
669
670 /*
671 * Update the various timers.
672 */
673 fr_time_elapsed_update(&worker->cpu_time, now, fr_time_add(now, reply->reply.processing_time));
674 fr_time_elapsed_update(&worker->wall_clock, reply->reply.request_time, now);
675
676 RDEBUG("Finished request");
677
678 /*
679 * Send the reply, which also polls the request queue.
680 */
681 if (fr_channel_send_reply(ch, reply) < 0) {
682 /*
683 * Should only happen if the TO_REQUESTOR
684 * channel is full, or it's not yet active.
685 *
686 * Not much we can do except complain
687 * loudly and cleanup the request.
688 */
689 RPERROR("Failed sending reply to network thread");
690 }
691
692 worker->stats.out++;
693
694 fr_assert(!fr_timer_armed(request->timeout));
695 fr_assert(!fr_heap_entry_inserted(request->runnable));
696
697 fr_dlist_entry_unlink(&request->listen_entry);
698
699#ifndef NDEBUG
700 request->async->el = NULL;
701 request->async->channel = NULL;
702 request->async->packet_ctx = NULL;
703 request->async->listen = NULL;
704#endif
705}
706
707/*
708 * talloc_typed_asprintf() is horrifically slow for printing
709 * simple numbers.
710 */
711static char *itoa_internal(TALLOC_CTX *ctx, uint64_t number)
712{
713 char buffer[32];
714 char *p;
715 char const *numbers = "0123456789";
716
717 p = buffer + 30;
718 *(p--) = '\0';
719
720 while (number > 0) {
721 *(p--) = numbers[number % 10];
722 number /= 10;
723 }
724
725 if (p[1]) return talloc_strdup(ctx, p + 1);
726
727 return talloc_strdup(ctx, "0");
728}
729
730/** Initialize various request fields needed by the worker.
731 *
732 */
733static inline CC_HINT(always_inline)
735{
736 /*
737 * For internal requests request->packet
738 * and request->reply are already populated.
739 */
740 if (!request->packet) MEM(request->packet = fr_packet_alloc(request, false));
741 if (!request->reply) MEM(request->reply = fr_packet_alloc(request, false));
742
743 request->packet->timestamp = now;
744 request->async = talloc_zero(request, fr_async_t);
745 request->async->recv_time = now;
746 request->async->el = worker->el;
747 fr_dlist_entry_init(&request->async->entry);
748}
749
750static inline CC_HINT(always_inline)
752{
753 request->number = atomic_fetch_add_explicit(&request_number, 1, memory_order_seq_cst);
754 if (request->name) talloc_const_free(request->name);
755 request->name = itoa_internal(request, request->number);
756}
757
758static inline CC_HINT(always_inline)
760{
761 return fr_timer_list_num_events(worker->timeout);
762}
763
764static int _worker_request_deinit(request_t *request, UNUSED void *uctx)
765{
766 return request_slab_deinit(request);
767}
768
770{
771 int ret = -1;
772 request_t *request;
773 fr_listen_t *listen = cd->listen;
774
775 if (worker_num_requests(worker) >= (uint32_t) worker->config.max_requests) {
776 RATE_LIMIT_GLOBAL(ERROR, "Worker at max requests");
777 goto nak;
778 }
779
780 /*
781 * Receive a message to the worker queue, and decode it
782 * to a request.
783 */
784 fr_assert(listen != NULL);
785
786 request = request_slab_reserve(worker->slab);
787 if (!request) {
788 RATE_LIMIT_GLOBAL(ERROR, "Worker failed allocating new request");
789 goto nak;
790 }
791 /*
792 * Ensures that both the deinit function runs AND
793 * the request is returned to the slab if something
794 * calls talloc_free() on it.
795 */
796 request_slab_element_set_destructor(request, _worker_request_deinit, worker);
797
798 /*
799 * Have to initialise the request manually because namspace
800 * changes based on the listener that allocated it.
801 */
802 if (request_init(request, REQUEST_TYPE_EXTERNAL, (&(request_init_args_t){ .namespace = listen->dict })) < 0) {
803 request_slab_release(request);
804 goto nak;
805 }
806
807 /*
808 * Do normal worker init that's shared between internal
809 * and external requests.
810 */
811 worker_request_init(worker, request, now);
813
814 /*
815 * Associate our interpreter with the request
816 */
817 unlang_interpret_set(request, worker->intp);
818
819 request->packet->timestamp = cd->request.recv_time; /* Legacy - Remove once everything looks at request->async */
820
821 /*
822 * Update the transport-specific fields.
823 */
824 request->async->channel = cd->channel.ch;
825
826 request->async->recv_time = cd->request.recv_time;
827
828 request->async->listen = listen;
829 request->async->packet_ctx = cd->packet_ctx;
830 request->priority = cd->priority;
831
832 /*
833 * Now that the "request" structure has been initialized, go decode the packet.
834 *
835 * Note that this also sets the "async process" function.
836 */
837 if (listen->app->decode) {
838 ret = listen->app->decode(listen->app_instance, request, cd->m.data, cd->m.data_size);
839 } else if (listen->app_io->decode) {
840 ret = listen->app_io->decode(listen->app_io_instance, request, cd->m.data, cd->m.data_size);
841 }
842
843 if (ret < 0) {
844 fail:
845 fr_assert(talloc_parent(request->stack) == request);
846 request_slab_release(request);
847
848 nak:
849 worker_nak(worker, cd, now);
850 return;
851 }
852
853 /*
854 * Set the entry point for this virtual server.
855 */
856 if (unlang_call_push(NULL, request, cd->listen->server_cs, UNLANG_TOP_FRAME) < 0) {
857 RERROR("Protocol failed to set 'process' function");
858 goto fail;
859 }
860
861 /*
862 * Look for conflicting / duplicate packets, but only if
863 * requested to do so.
864 */
865 if (request->async->listen->track_duplicates) {
866 request_t *old;
867
868 old = fr_rb_find(worker->dedup, request);
869 if (!old) {
870 goto insert_new;
871 }
872
873 fr_assert(old->async->listen == request->async->listen);
874 fr_assert(old->async->channel == request->async->channel);
875
876 /*
877 * There's a new packet. Do we keep the old one,
878 * or the new one? This decision is made by
879 * checking the recv_time, which is a
880 * nanosecond-resolution timer. If the time is
881 * identical, then the new packet is the same as
882 * the old one.
883 *
884 * If the new packet is a duplicate of the old
885 * one, then we can just discard the new one. We
886 * have to tell the channel that we've "eaten"
887 * this reply, so the sequence number should
888 * increase.
889 *
890 * @todo - fix the channel code to do queue
891 * depth, and not sequence / ack.
892 */
893 if (fr_time_eq(old->async->recv_time, request->async->recv_time)) {
894 RWARN("Discarding duplicate of request (%"PRIu64")", old->number);
895
896 fr_channel_null_reply(request->async->channel);
897 request_slab_release(request);
898
899 /*
900 * Signal there's a dup, and ignore the
901 * return code. We don't bother replying
902 * here, as an FD event or timer will
903 * wake up the request, and cause it to
904 * continue.
905 *
906 * @todo - the old request is NOT
907 * running, but is yielded. It MAY clean
908 * itself up, or do something...
909 */
911 worker->stats.dup++;
912
913 fr_message_done(&cd->m);
914 return;
915 }
916
917 /*
918 * Stop the old request, and decrement the number
919 * of active requests.
920 */
921 RWARN("Got conflicting packet for request (%" PRIu64 "), telling old request to stop", old->number);
922
924 worker->stats.dropped++;
925 (void) fr_rb_remove(worker->dedup, old); /* remove, but do NOT free it */
926
927 insert_new:
928 (void) fr_rb_insert(worker->dedup, request);
929 }
930
931 if (worker_request_time_tracking_start(worker, request, now) < 0) {
932 if (request->async->listen->track_duplicates) (void) fr_rb_remove(worker->dedup, request);
933 goto fail;
934 }
935
936 /*
937 * We're done with this message.
938 */
939 fr_message_done(&cd->m);
940
941 {
943
944 wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = listen });
945 if (!wl) {
946 MEM(wl = talloc_zero(worker, fr_worker_listen_t));
947 fr_dlist_init(&wl->dlist, request_t, listen_entry);
948 wl->listener = listen;
949
950 (void) fr_rb_insert(worker->listeners, wl);
951 }
952
953 fr_dlist_insert_tail(&wl->dlist, request);
954 }
955}
956
957/**
958 * Track a request_t in the "runnable" heap.
959 * Higher priorities take precedence, followed by lower sequence numbers
960 */
961static int8_t worker_runnable_cmp(void const *one, void const *two)
962{
963 request_t const *a = one, *b = two;
964 int ret;
965
966 /*
967 * Prefer higher priority packets.
968 */
969 ret = CMP_PREFER_LARGER(b->priority, a->priority);
970 if (ret != 0) return ret;
971
972 /*
973 * Prefer packets which are further along in their processing sequence.
974 */
975 ret = CMP_PREFER_LARGER(a->sequence, b->sequence);
976 if (ret != 0) return ret;
977
978 /*
979 * Smaller timestamp (i.e. earlier) is more important.
980 */
981 return fr_time_cmp(a->async->recv_time, b->async->recv_time);
982}
983
984/**
985 * Track a request_t in the "dedup" tree
986 */
987static int8_t worker_dedup_cmp(void const *one, void const *two)
988{
989 int ret;
990 request_t const *a = one, *b = two;
991
992 ret = CMP(a->async->listen, b->async->listen);
993 if (ret) return ret;
994
995 return CMP(a->async->packet_ctx, b->async->packet_ctx);
996}
997
998/** Destroy a worker
999 *
1000 * The input channels are signaled, and local messages are cleaned up.
1001 *
1002 * This should be called to _EXPLICITLY_ destroy a worker, when some fatal
1003 * error has occurred on the worker side, and we need to destroy it.
1004 *
1005 * We signal all pending requests in the backlog to stop, and tell the
1006 * network side that it should not send us any more requests.
1007 *
1008 * @param[in] worker the worker to destroy.
1009 */
1011{
1012 int i, count, ret;
1013
1014// WORKER_VERIFY;
1015
1016 /*
1017 * Stop any new requests running with this interpreter
1018 */
1020
1021 /*
1022 * Destroy all of the active requests. These are ones
1023 * which are still waiting for timers or file descriptor
1024 * events.
1025 */
1026 count = 0;
1027
1028 /*
1029 * Force the timeout event to fire for all requests that
1030 * are still running.
1031 */
1032 ret = fr_timer_list_force_run(worker->timeout);
1033 if (unlikely(ret < 0)) {
1034 fr_assert_msg(0, "Failed to force run the timeout list");
1035 } else {
1036 count += ret;
1037 }
1038
1040
1041 DEBUG("Worker is exiting - stopped %u requests", count);
1042
1043 /*
1044 * Signal the channels that we're closing.
1045 *
1046 * The other end owns the channel, and will take care of
1047 * popping messages in the TO_RESPONDER queue, and marking
1048 * them FR_MESSAGE_DONE. It will ignore the messages in
1049 * the TO_REQUESTOR queue, as we own those. They will be
1050 * automatically freed when our talloc context is freed.
1051 */
1052 for (i = 0; i < worker->config.max_channels; i++) {
1053 if (!worker->channel[i].ch) continue;
1054
1055 worker_requests_cancel(&worker->channel[i]);
1056
1057 fr_assert_msg(fr_dlist_num_elements(&worker->channel[i].dlist) == 0,
1058 "Pending messages in channel after cancelling request");
1059
1061 }
1062
1063 talloc_free(worker);
1064}
1065
1066/** Internal request (i.e. one generated by the interpreter) is now complete
1067 *
1068 */
1069static void _worker_request_internal_init(request_t *request, void *uctx)
1070{
1071 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1072 fr_time_t now = fr_time();
1073
1074 worker_request_init(worker, request, now);
1075
1076 /*
1077 * Requests generated by the interpreter
1078 * are always marked up as internal.
1079 */
1081 if (worker_request_time_tracking_start(worker, request, now) < 0) {
1083 }
1084}
1085
1086
1087/** External request is now complete
1088 *
1089 */
1090static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1091{
1092 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1093 fr_time_t now = fr_time();
1094
1095 /*
1096 * All external requests MUST have a listener.
1097 */
1099 fr_assert(request->async->listen != NULL);
1100
1101 /*
1102 * Only real packets are in the dedup tree. And even
1103 * then, only some of the time.
1104 */
1105 if (request->async->listen->track_duplicates && fr_rb_node_inline_in_tree(&request->dedup_node)) {
1106 (void) fr_rb_delete(worker->dedup, request);
1107 }
1108
1109 /*
1110 * If we're running a real request, then the final
1111 * indentation MUST be zero. Otherwise we skipped
1112 * something!
1113 *
1114 * Also check that the request is NOT marked as
1115 * "yielded", but is in fact done.
1116 *
1117 * @todo - check that the stack is at frame 0, otherwise
1118 * more things have gone wrong.
1119 */
1120 fr_assert_msg(request_is_internal(request) || request_is_detached(request) || (request->log.indent.unlang == 0),
1121 "Request %s bad log indentation - expected 0 got %u", request->name, request->log.indent.unlang);
1123 "Request %s is marked as yielded at end of processing", request->name);
1125 "Request %s stack depth %u > 0", request->name, unlang_interpret_stack_depth(request));
1126 RDEBUG("Done request");
1127
1128 /*
1129 * The request is done. Track that.
1130 */
1131 worker_request_time_tracking_end(worker, request, now);
1132
1133 /*
1134 * Remove it from the list of requests associated with this channel.
1135 */
1136 if (fr_dlist_entry_in_list(&request->async->entry)) {
1137 fr_dlist_entry_unlink(&request->async->entry);
1138 }
1139
1140 /*
1141 * These conditions are true when the server is
1142 * exiting and we're stopping all the requests.
1143 *
1144 * This should never happen otherwise.
1145 */
1146 if (unlikely(!fr_channel_active(request->async->channel))) {
1147 fr_dlist_entry_unlink(&request->listen_entry);
1148 request_slab_release(request);
1149 return;
1150 }
1151
1152 worker_send_reply(worker, request, !unlang_request_is_cancelled(request), now);
1153 request_slab_release(request);
1154}
1155
1156/** Internal request (i.e. one generated by the interpreter) is now complete
1157 *
1158 * Whatever generated the request is now responsible for freeing it.
1159 */
1160static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1161{
1162 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1163
1164 worker_request_time_tracking_end(worker, request, fr_time());
1165
1166 fr_assert(!fr_heap_entry_inserted(request->runnable));
1167 fr_assert(!fr_timer_armed(request->timeout));
1168 fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1169}
1170
1171/** Detached request (i.e. one generated by the interpreter with no parent) is now complete
1172 *
1173 * As the request has no parent, then there's nothing to free it
1174 * so we have to.
1175 */
1176static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
1177{
1178 /*
1179 * No time tracking for detached requests
1180 * so we don't need to call
1181 * worker_request_time_tracking_end.
1182 */
1183 fr_assert(!fr_heap_entry_inserted(request->runnable));
1184
1185 /*
1186 * Normally worker_request_time_tracking_end
1187 * would remove the request from the time
1188 * order heap, but we need to do that for
1189 * detached requests.
1190 */
1191 TALLOC_FREE(request->timeout);
1192
1193 fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1194
1195 /*
1196 * Detached requests have to be freed by us
1197 * as nothing else can free them.
1198 *
1199 * All other requests must be freed by the
1200 * code which allocated them.
1201 */
1202 talloc_free(request);
1203}
1204
1205
1206/** Make us responsible for running the request
1207 *
1208 */
1209static void _worker_request_detach(request_t *request, void *uctx)
1210{
1211 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1212 fr_time_t now = fr_time();
1213
1214 RDEBUG4("%s - Request detaching", __FUNCTION__);
1215
1216 if (request_is_detachable(request)) {
1217 /*
1218 * End the time tracking... We don't track detached requests,
1219 * because they don't contribute for the time consumed by an
1220 * external request.
1221 */
1222 if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) {
1223 RDEBUG3("Forcing time tracking to running state, from yielded, for request detach");
1224 fr_time_tracking_resume(&request->async->tracking, now);
1225 }
1226 worker_request_time_tracking_end(worker, request, now);
1227
1228 if (request_detach(request) < 0) RPEDEBUG("Failed detaching request");
1229
1230 RDEBUG3("Request is detached");
1231 } else {
1232 fr_assert_msg(0, "Request is not detachable");
1233 }
1234
1235 return;
1236}
1237
1238/** Request is now runnable
1239 *
1240 */
1241static void _worker_request_runnable(request_t *request, void *uctx)
1242{
1243 fr_worker_t *worker = uctx;
1244
1245 RDEBUG4("%s - Request marked as runnable", __FUNCTION__);
1246 fr_heap_insert(&worker->runnable, request);
1247}
1248
1249/** Interpreter yielded request
1250 *
1251 */
1252static void _worker_request_yield(request_t *request, UNUSED void *uctx)
1253{
1254 RDEBUG4("%s - Request yielded", __FUNCTION__);
1255 if (likely(!request_is_detached(request))) fr_time_tracking_yield(&request->async->tracking, fr_time());
1256}
1257
1258/** Interpreter is starting to work on request again
1259 *
1260 */
1261static void _worker_request_resume(request_t *request, UNUSED void *uctx)
1262{
1263 RDEBUG4("%s - Request resuming", __FUNCTION__);
1264 if (likely(!request_is_detached(request))) fr_time_tracking_resume(&request->async->tracking, fr_time());
1265}
1266
1267/** Check if a request is scheduled
1268 *
1269 */
1270static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
1271{
1272 return fr_heap_entry_inserted(request->runnable);
1273}
1274
1275/** Update a request's priority
1276 *
1277 */
1278static void _worker_request_prioritise(request_t *request, void *uctx)
1279{
1280 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1281
1282 RDEBUG4("%s - Request priority changed", __FUNCTION__);
1283
1284 /* Extract the request from the runnable queue _if_ it's in the runnable queue */
1285 if (fr_heap_extract(&worker->runnable, request) < 0) return;
1286
1287 /* Reinsert it to re-evaluate its new priority */
1288 fr_heap_insert(&worker->runnable, request);
1289}
1290
1291/** Run a request
1292 *
1293 * Until it either yields, or is done.
1294 *
1295 * This function is also responsible for sending replies, and
1296 * cleaning up the request.
1297 *
1298 * @param[in] worker the worker
1299 * @param[in] start the current time
1300 */
1301static inline CC_HINT(always_inline) void worker_run_request(fr_worker_t *worker, fr_time_t start)
1302{
1303 request_t *request;
1304 fr_time_t now;
1305
1307
1308 now = start;
1309
1310 /*
1311 * Busy-loop running requests for 1ms. We still poll the
1312 * event loop 1000 times a second, OR when there's no
1313 * more work to do. This allows us to make progress with
1314 * ongoing requests, at the expense of sometimes ignoring
1315 * new ones.
1316 */
1317 while (fr_time_delta_lt(fr_time_sub(now, start), fr_time_delta_from_msec(1)) &&
1318 ((request = fr_heap_pop(&worker->runnable)) != NULL)) {
1319
1320 REQUEST_VERIFY(request);
1321 fr_assert(!fr_heap_entry_inserted(request->runnable));
1322
1323 /*
1324 * For real requests, if the channel is gone,
1325 * just stop the request and free it.
1326 */
1327 if (request->async->channel && !fr_channel_active(request->async->channel)) {
1328 worker_stop_request(request);
1329 continue;
1330 }
1331
1333
1334 now = fr_time();
1335 }
1336}
1337
1338/** Create a worker
1339 *
1340 * @param[in] ctx the talloc context
1341 * @param[in] name the name of this worker
1342 * @param[in] el the event list
1343 * @param[in] logger the destination for all logging messages
1344 * @param[in] lvl log level
1345 * @param[in] config various configuration parameters
1346 * @return
1347 * - NULL on error
1348 * - fr_worker_t on success
1349 */
1350fr_worker_t *fr_worker_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl,
1352{
1353 fr_worker_t *worker;
1354
1355 worker = talloc_zero(ctx, fr_worker_t);
1356 if (!worker) {
1357nomem:
1358 fr_strerror_const("Failed allocating memory");
1359 return NULL;
1360 }
1361
1362 worker->name = talloc_strdup(worker, name); /* thread locality */
1363
1365
1366 if (config) worker->config = *config;
1367
1368#define CHECK_CONFIG(_x, _min, _max) do { \
1369 if (!worker->config._x) worker->config._x = _min; \
1370 if (worker->config._x < _min) worker->config._x = _min; \
1371 if (worker->config._x > _max) worker->config._x = _max; \
1372 } while (0)
1373
1374#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max) do { \
1375 if (fr_time_delta_lt(worker->config._x, _min)) worker->config._x = _min; \
1376 if (fr_time_delta_gt(worker->config._x, _max)) worker->config._x = _max; \
1377 } while (0)
1378
1379 CHECK_CONFIG(max_requests,1024,(1 << 30));
1380 CHECK_CONFIG(max_channels, 64, 1024);
1381 CHECK_CONFIG(reuse.child_pool_size, 4096, 65536);
1382 CHECK_CONFIG(message_set_size, 1024, 8192);
1383 CHECK_CONFIG(ring_buffer_size, (1 << 17), (1 << 20));
1385
1386 worker->channel = talloc_zero_array(worker, fr_worker_channel_t, worker->config.max_channels);
1387 if (!worker->channel) {
1388 talloc_free(worker);
1389 goto nomem;
1390 }
1391
1392 worker->thread_id = pthread_self();
1393 worker->el = el;
1394 worker->log = logger;
1395 worker->lvl = lvl;
1396
1397 /*
1398 * The worker thread starts now. Manually initialize it,
1399 * because we're tracking request time, not the time that
1400 * the worker thread is running.
1401 */
1402 memset(&worker->tracking, 0, sizeof(worker->tracking));
1403
1404 worker->aq_control = fr_atomic_queue_alloc(worker, 1024);
1405 if (!worker->aq_control) {
1406 fr_strerror_const("Failed creating atomic queue");
1407 fail:
1408 talloc_free(worker);
1409 return NULL;
1410 }
1411
1412 worker->control = fr_control_create(worker, el, worker->aq_control, 7);
1413 if (!worker->control) {
1414 fr_strerror_const_push("Failed creating control plane");
1415 goto fail;
1416 }
1417
1419 fr_strerror_const_push("Failed adding control channel");
1420 goto fail;
1421 }
1422
1424 fr_strerror_const_push("Failed adding callback for listeners");
1425 goto fail;
1426 }
1427
1428 if (fr_control_open(worker->control) < 0) {
1429 fr_strerror_const_push("Failed opening control plane");
1430 goto fail;
1431 }
1432
1433 worker->runnable = fr_heap_talloc_alloc(worker, worker_runnable_cmp, request_t, runnable, 0);
1434 if (!worker->runnable) {
1435 fr_strerror_const("Failed creating runnable heap");
1436 goto fail;
1437 }
1438
1439 worker->timeout = fr_timer_list_ordered_alloc(worker, el->tl);
1440 if (!worker->timeout) {
1441 fr_strerror_const("Failed creating timeouts list");
1442 goto fail;
1443 }
1444
1445 worker->dedup = fr_rb_inline_talloc_alloc(worker, request_t, dedup_node, worker_dedup_cmp, NULL);
1446 if (!worker->dedup) {
1447 fr_strerror_const("Failed creating de_dup tree");
1448 goto fail;
1449 }
1450
1452 if (!worker->listeners) {
1453 fr_strerror_const("Failed creating listener tree");
1454 goto fail;
1455 }
1456
1457 worker->intp = unlang_interpret_init(worker, el,
1459 .init_internal = _worker_request_internal_init,
1460
1461 .done_external = _worker_request_done_external,
1462 .done_internal = _worker_request_done_internal,
1463 .done_detached = _worker_request_done_detached,
1464
1465 .detach = _worker_request_detach,
1466 .yield = _worker_request_yield,
1467 .resume = _worker_request_resume,
1468 .mark_runnable = _worker_request_runnable,
1469
1470 .scheduled = _worker_request_scheduled,
1471 .prioritise = _worker_request_prioritise
1472 },
1473 worker);
1474 if (!worker->intp){
1475 fr_strerror_const("Failed initialising interpreter");
1476 goto fail;
1477 }
1478
1479 {
1482
1483 if (!(worker->slab = request_slab_list_alloc(worker, el, &worker->config.reuse, NULL, NULL,
1484 UNCONST(void *, worker), true, false))) {
1485 fr_strerror_const("Failed creating request slab list");
1486 goto fail;
1487 }
1488 }
1489
1491
1492 return worker;
1493}
1494
1495
1496/** The main loop and entry point of the stand-alone worker thread.
1497 *
1498 * Where there is only one thread, the event loop runs fr_worker_pre_event() and fr_worker_post_event()
1499 * instead, And then fr_worker_post_event() takes care of calling worker_run_request() to actually run the
1500 * request.
1501 *
1502 * @param[in] worker the worker data structure to manage
1503 */
1505{
1507
1508 while (true) {
1509 bool wait_for_event;
1510 int num_events;
1511
1513
1514 /*
1515 * There are runnable requests. We still service
1516 * the event loop, but we don't wait for events.
1517 */
1518 wait_for_event = (fr_heap_num_elements(worker->runnable) == 0);
1519 if (wait_for_event) {
1520 if (worker->exiting && (worker_num_requests(worker) == 0)) break;
1521
1522 DEBUG4("Ready to process requests");
1523 }
1524
1525 /*
1526 * Check the event list. If there's an error
1527 * (e.g. exit), we stop looping and clean up.
1528 */
1529 DEBUG4("Gathering events - %s", wait_for_event ? "will wait" : "Will not wait");
1530 num_events = fr_event_corral(worker->el, fr_time(), wait_for_event);
1531 if (num_events < 0) {
1532 if (fr_event_loop_exiting(worker->el)) {
1533 DEBUG4("Event loop exiting");
1534 break;
1535 }
1536
1537 PERROR("Failed retrieving events");
1538 break;
1539 }
1540
1541 DEBUG4("%u event(s) pending", num_events);
1542
1543 /*
1544 * Service outstanding events.
1545 */
1546 if (num_events > 0) {
1547 DEBUG4("Servicing event(s)");
1548 fr_event_service(worker->el);
1549 }
1550
1551 /*
1552 * Run any outstanding requests.
1553 */
1554 worker_run_request(worker, fr_time());
1555 }
1556}
1557
1558/** Pre-event handler
1559 *
1560 * This should be run ONLY in single-threaded mode!
1561 */
1563{
1564 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1565 request_t *request;
1566
1567 request = fr_heap_peek(worker->runnable);
1568 if (!request) return 0;
1569
1570 /*
1571 * There's work to do. Tell the event handler to poll
1572 * for IO / timers, but also immediately return to the
1573 * calling function, which has more work to do.
1574 */
1575 return 1;
1576}
1577
1578
1579/** Post-event handler
1580 *
1581 * This should be run ONLY in single-threaded mode!
1582 */
1584{
1585 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1586
1587 worker_run_request(worker, fr_time()); /* Event loop time can be too old, and trigger asserts */
1588}
1589
1590/** Print debug information about the worker structure
1591 *
1592 * @param[in] worker the worker
1593 * @param[in] fp the file where the debug output is printed.
1594 */
1595void fr_worker_debug(fr_worker_t *worker, FILE *fp)
1596{
1598
1599 fprintf(fp, "\tnum_channels = %d\n", worker->num_channels);
1600 fprintf(fp, "\tstats.in = %" PRIu64 "\n", worker->stats.in);
1601
1602 fprintf(fp, "\tcalculated (predicted) total CPU time = %" PRIu64 "\n",
1603 fr_time_delta_unwrap(worker->predicted) * worker->stats.in);
1604 if (worker->stats.in) {
1605 fprintf(fp, "\tcalculated (counted) per request time = %" PRIu64 "\n",
1607 }
1608
1609 fr_time_tracking_debug(&worker->tracking, fp);
1610
1611}
1612
1613/** Create a channel to the worker
1614 *
1615 * Called by the master (i.e. network) thread when it needs to create
1616 * a new channel to a particuler worker.
1617 *
1618 * @param[in] worker the worker
1619 * @param[in] master the control plane of the master
1620 * @param[in] ctx the context in which the channel will be created
1621 */
1623{
1624 fr_channel_t *ch;
1625 pthread_t id;
1626 bool same;
1627
1629
1630 id = pthread_self();
1631 same = (pthread_equal(id, worker->thread_id) != 0);
1632
1633 ch = fr_channel_create(ctx, master, worker->control, same);
1634 if (!ch) return NULL;
1635
1637
1638 /*
1639 * Tell the worker about the channel
1640 */
1641 if (fr_channel_signal_open(ch) < 0) {
1642 talloc_free(ch);
1643 return NULL;
1644 }
1645
1646 return ch;
1647}
1648
1650{
1651 fr_ring_buffer_t *rb;
1652
1653 /*
1654 * Skip a bunch of work if we're already in the worker thread.
1655 */
1656 if (is_worker_thread(worker)) {
1657 return fr_worker_listen_cancel_self(worker, li);
1658 }
1659
1660 rb = fr_worker_rb_init();
1661 if (!rb) return -1;
1662
1663 return fr_control_message_send(worker->control, rb, FR_CONTROL_ID_LISTEN_DEAD, &li, sizeof(li));
1664}
1665
1666#ifdef WITH_VERIFY_PTR
1667/** Verify the worker data structures.
1668 *
1669 * @param[in] worker the worker
1670 */
1671static void worker_verify(fr_worker_t *worker)
1672{
1673 int i;
1674
1675 (void) talloc_get_type_abort(worker, fr_worker_t);
1676 fr_atomic_queue_verify(worker->aq_control);
1677
1678 fr_assert(worker->control != NULL);
1679 (void) talloc_get_type_abort(worker->control, fr_control_t);
1680
1681 fr_assert(worker->el != NULL);
1682 (void) talloc_get_type_abort(worker->el, fr_event_list_t);
1683
1684 fr_assert(worker->runnable != NULL);
1685 (void) talloc_get_type_abort(worker->runnable, fr_heap_t);
1686
1687 fr_assert(worker->dedup != NULL);
1688 (void) talloc_get_type_abort(worker->dedup, fr_rb_tree_t);
1689
1690 for (i = 0; i < worker->config.max_channels; i++) {
1691 if (!worker->channel[i].ch) continue;
1692
1693 (void) talloc_get_type_abort(worker->channel[i].ch, fr_channel_t);
1694 }
1695}
1696#endif
1697
1698int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
1699{
1700 if (num < 0) return -1;
1701 if (num == 0) return 0;
1702
1703 stats[0] = worker->stats.in;
1704 if (num >= 2) stats[1] = worker->stats.out;
1705 if (num >= 3) stats[2] = worker->stats.dup;
1706 if (num >= 4) stats[3] = worker->stats.dropped;
1707 if (num >= 5) stats[4] = worker->num_naks;
1708 if (num >= 6) stats[5] = worker->num_active;
1709
1710 if (num <= 6) return num;
1711
1712 return 6;
1713}
1714
1715static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
1716{
1717 fr_worker_t const *worker = ctx;
1718 fr_time_delta_t when;
1719
1720 if ((info->argc == 0) || (strcmp(info->argv[0], "count") == 0)) {
1721 fprintf(fp, "count.in\t\t\t%" PRIu64 "\n", worker->stats.in);
1722 fprintf(fp, "count.out\t\t\t%" PRIu64 "\n", worker->stats.out);
1723 fprintf(fp, "count.dup\t\t\t%" PRIu64 "\n", worker->stats.dup);
1724 fprintf(fp, "count.dropped\t\t\t%" PRIu64 "\n", worker->stats.dropped);
1725 fprintf(fp, "count.naks\t\t\t%" PRIu64 "\n", worker->num_naks);
1726 fprintf(fp, "count.active\t\t\t%" PRIu64 "\n", worker->num_active);
1727 fprintf(fp, "count.runnable\t\t\t%u\n", fr_heap_num_elements(worker->runnable));
1728 }
1729
1730 if ((info->argc == 0) || (strcmp(info->argv[0], "cpu") == 0)) {
1731 when = worker->predicted;
1732 fprintf(fp, "cpu.request_time_rtt\t\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1733
1734 when = worker->tracking.running_total;
1735 if (fr_time_delta_ispos(when) && (worker->stats.in > worker->stats.dropped)) {
1736 when = fr_time_delta_div(when, fr_time_delta_wrap(worker->stats.in - worker->stats.dropped));
1737 }
1738 fprintf(fp, "cpu.average_request_time\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1739
1740 when = worker->tracking.running_total;
1741 fprintf(fp, "cpu.used\t\t\t%.6f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1742
1743 when = worker->tracking.waiting_total;
1744 fprintf(fp, "cpu.waiting\t\t\t%.3f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1745
1746 fr_time_elapsed_fprint(fp, &worker->cpu_time, "cpu.requests", 4);
1747 fr_time_elapsed_fprint(fp, &worker->wall_clock, "time.requests", 4);
1748 }
1749
1750 return 0;
1751}
1752
1754 {
1755 .parent = "stats",
1756 .name = "worker",
1757 .help = "Statistics for workers threads.",
1758 .read_only = true
1759 },
1760
1761 {
1762 .parent = "stats worker",
1763 .add_name = true,
1764 .name = "self",
1765 .syntax = "[(count|cpu)]",
1766 .func = cmd_stats_worker,
1767 .help = "Show statistics for a specific worker thread.",
1768 .read_only = true
1769 },
1770
1772};
static int const char char buffer[256]
Definition acutest.h:576
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
Definition app_io.h:55
size_t default_reply_size
same for replies
Definition app_io.h:40
size_t default_message_size
Usually maximum message size.
Definition app_io.h:39
fr_io_nak_t nak
Function to send a NAK.
Definition app_io.h:62
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
Definition app_io.h:54
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
Definition application.h:80
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
Definition application.h:85
#define fr_atexit_thread_local(_name, _free, _uctx)
Definition atexit.h:220
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Structure to hold the atomic queue.
#define UNCONST(_type, _ptr)
Remove const qualification from a pointer.
Definition build.h:168
#define RCSID(id)
Definition build.h:488
#define NDEBUG_UNUSED
Definition build.h:329
#define CMP_PREFER_LARGER(_a, _b)
Evaluates to -1 for a > b, and +1 for a < b.
Definition build.h:109
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:113
#define unlikely(_x)
Definition build.h:384
#define UNUSED
Definition build.h:318
unlang_action_t unlang_call_push(unlang_result_t *p_result, request_t *request, CONF_SECTION *server_cs, bool top_frame)
Push a call frame onto the stack.
Definition call.c:147
fr_table_num_sorted_t const channel_signals[]
Definition channel.c:151
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
Definition channel.c:181
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition channel.c:683
int fr_channel_set_recv_request(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_request)
Definition channel.c:934
void * fr_channel_responder_uctx_get(fr_channel_t *ch)
Get responder-specific data from a channel.
Definition channel.c:893
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
Definition channel.c:470
int fr_channel_null_reply(fr_channel_t *ch)
Don't send a reply message into the channel.
Definition channel.c:622
void fr_channel_responder_uctx_add(fr_channel_t *ch, void *uctx)
Add responder-specific data to a channel.
Definition channel.c:881
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
Definition channel.c:509
bool fr_channel_active(fr_channel_t *ch)
Check if a channel is active.
Definition channel.c:810
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
Definition channel.c:852
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
Definition channel.c:948
A full channel, which consists of two ends.
Definition channel.c:142
fr_message_t m
the message header
Definition channel.h:107
fr_channel_event_t
Definition channel.h:69
@ FR_CHANNEL_NOOP
Definition channel.h:76
@ FR_CHANNEL_EMPTY
Definition channel.h:77
@ FR_CHANNEL_CLOSE
Definition channel.h:74
@ FR_CHANNEL_ERROR
Definition channel.h:70
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition channel.h:72
@ FR_CHANNEL_OPEN
Definition channel.h:73
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition channel.h:71
void * packet_ctx
Packet specific context for holding client information, and other proto_* specific information that n...
Definition channel.h:144
fr_listen_t * listen
for tracking packet transport, etc.
Definition channel.h:148
#define FR_CONTROL_ID_CHANNEL
Definition channel.h:67
uint32_t priority
Priority of this packet.
Definition channel.h:142
Channel information which is added to a message.
Definition channel.h:106
int argc
current argument count
Definition command.h:39
char const * parent
e.g. "show module"
Definition command.h:52
#define CMD_TABLE_END
Definition command.h:62
char const ** argv
text version of commands
Definition command.h:42
#define FR_CONTROL_MAX_SIZE
Definition control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition control.h:50
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:141
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:212
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:158
#define MEM(x)
Definition debug.h:46
#define ERROR(fmt,...)
Definition dhcpclient.c:40
#define DEBUG(fmt,...)
Definition dhcpclient.c:38
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:242
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition dlist.h:468
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition dlist.h:145
static void fr_dlist_entry_unlink(fr_dlist_t *entry)
Remove an item from the dlist when we don't have access to the head.
Definition dlist.h:128
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:921
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition dlist.h:654
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:360
static void fr_dlist_entry_init(fr_dlist_t *entry)
Initialise a linked list without metadata.
Definition dlist.h:120
Head of a doubly linked list.
Definition dlist.h:51
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
Definition heap.c:325
int fr_heap_extract(fr_heap_t **hp, void *data)
Remove a node from the heap.
Definition heap.c:239
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition heap.h:136
static bool fr_heap_entry_inserted(fr_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
Definition heap.h:124
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition heap.h:115
The main heap structure.
Definition heap.h:66
talloc_free(hp)
rlm_rcode_t unlang_interpret(request_t *request, bool running)
Run the interpreter for a current request.
Definition interpret.c:940
void unlang_interpret_set(request_t *request, unlang_interpret_t *intp)
Set a specific interpreter for a request.
Definition interpret.c:2034
int unlang_interpret_stack_depth(request_t *request)
Return the depth of the request's stack.
Definition interpret.c:1549
void unlang_interpret_set_thread_default(unlang_interpret_t *intp)
Set the default interpreter for this thread.
Definition interpret.c:2065
unlang_interpret_t * unlang_interpret_init(TALLOC_CTX *ctx, fr_event_list_t *el, unlang_request_func_t *funcs, void *uctx)
Initialize a unlang compiler / interpret.
Definition interpret.c:1993
bool unlang_request_is_cancelled(request_t const *request)
Return whether a request has been cancelled.
Definition interpret.c:1599
void unlang_interpret_signal(request_t *request, fr_signal_t action)
Send a signal (usually stop) to a request.
Definition interpret.c:1417
bool unlang_interpret_is_resumable(request_t *request)
Check if a request as resumable.
Definition interpret.c:1618
#define UNLANG_REQUEST_RESUME
Definition interpret.h:43
#define UNLANG_TOP_FRAME
Definition interpret.h:36
External functions provided by the owner of the interpret.
Definition interpret.h:110
uint64_t out
Definition base.h:43
uint64_t dup
Definition base.h:44
uint64_t dropped
Definition base.h:45
uint64_t in
Definition base.h:42
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq, size_t num_callbacks)
Create a control-plane signaling path.
Definition control.c:152
int fr_control_callback_add(fr_control_t **c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition control.c:443
int fr_control_open(fr_control_t *c)
Open the control-plane signalling path.
Definition control.c:176
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:355
The control structure.
Definition control.c:76
void const * app_instance
Definition listen.h:39
fr_app_t const * app
Definition listen.h:38
void const * app_io_instance
I/O path configuration context.
Definition listen.h:33
CONF_SECTION * server_cs
CONF_SECTION of the server.
Definition listen.h:42
fr_dict_t const * dict
dictionary for this listener
Definition listen.h:30
fr_app_io_t const * app_io
I/O path functions.
Definition listen.h:32
Minimal data structure to use the new code.
Definition listen.h:63
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define RDEBUG3(fmt,...)
Definition log.h:355
#define RWARN(fmt,...)
Definition log.h:309
#define RERROR(fmt,...)
Definition log.h:310
#define DEBUG4(_fmt,...)
Definition log.h:267
#define RPERROR(fmt,...)
Definition log.h:314
#define RPEDEBUG(fmt,...)
Definition log.h:388
#define RDEBUG4(fmt,...)
Definition log.h:356
#define RATE_LIMIT_GLOBAL(_log, _fmt,...)
Rate limit messages using a global limiting entry.
Definition log.h:653
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition event.c:2177
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition event.c:2045
int fr_event_post_delete(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Delete a post-event callback from the event list.
Definition event.c:2023
#define fr_time()
Definition event.c:60
bool fr_event_loop_exiting(fr_event_list_t *el)
Check to see whether the event loop is in the process of exiting.
Definition event.c:2366
Stores all information relating to an event list.
Definition event.c:377
fr_log_lvl_t
Definition log.h:64
fr_packet_t * fr_packet_alloc(TALLOC_CTX *ctx, bool new_vector)
Allocate a new fr_packet_t.
Definition packet.c:38
unsigned int uint32_t
long int ssize_t
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
Create a message set.
Definition message.c:127
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:195
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:1000
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition message.c:1250
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition message.c:946
A Message set, composed of message headers and ring buffer data.
Definition message.c:94
size_t rb_size
cache-aligned size in the ring buffer
Definition message.h:51
fr_time_t when
when this message was sent
Definition message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
static const conf_parser_t config[]
Definition base.c:163
#define fr_assert(_expr)
Definition rad_assert.h:37
#define REDEBUG(fmt,...)
#define RDEBUG(fmt,...)
#define DEBUG2(fmt,...)
static void send_reply(int sockfd, fr_channel_data_t *reply)
void * fr_rb_remove(fr_rb_tree_t *tree, void const *data)
Remove an entry from the tree, without freeing the data.
Definition rb.c:695
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
bool fr_rb_delete(fr_rb_tree_t *tree, void const *data)
Remove node and free data (if a free function was specified)
Definition rb.c:741
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
Definition rb.h:244
static bool fr_rb_node_inline_in_tree(fr_rb_node_t const *node)
Check to see if an item is in a tree by examining its inline fr_rb_node_t.
Definition rb.h:312
The main red black tree structure.
Definition rb.h:71
rlm_rcode_t
Return codes indicating the result of the module call.
Definition rcode.h:44
@ RLM_MODULE_TIMEOUT
Module (or section) timed out.
Definition rcode.h:56
int request_slab_deinit(request_t *request)
Callback for slabs to deinitialise the request.
Definition request.c:383
int request_detach(request_t *child)
Unlink a subrequest from its parent.
Definition request.c:542
#define REQUEST_VERIFY(_x)
Definition request.h:309
#define request_is_detached(_x)
Definition request.h:186
#define request_is_external(_x)
Definition request.h:184
#define REQUEST_POOL_HEADERS
Definition request.h:66
#define request_is_internal(_x)
Definition request.h:185
@ REQUEST_TYPE_EXTERNAL
A request received on the wire.
Definition request.h:178
#define request_is_detachable(_x)
Definition request.h:187
#define request_init(_ctx, _type, _args)
Definition request.h:321
#define REQUEST_POOL_SIZE
Definition request.h:79
Optional arguments for initialising requests.
Definition request.h:287
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:63
static char const * name
@ FR_SIGNAL_DUP
A duplicate request was received.
Definition signal.h:44
@ FR_SIGNAL_CANCEL
Request has been cancelled.
Definition signal.h:40
#define FR_SLAB_FUNCS(_name, _type)
Define type specific wrapper functions for slabs and slab elements.
Definition slab.h:124
#define FR_SLAB_TYPES(_name, _type)
Define type specific wrapper structs for slabs and slab elements.
Definition slab.h:75
unsigned int num_children
How many child allocations are expected off each element.
Definition slab.h:48
size_t child_pool_size
Size of pool space to be allocated to each element.
Definition slab.h:49
return count
Definition module.c:155
@ memory_order_seq_cst
Definition stdatomic.h:132
#define atomic_fetch_add_explicit(object, operand, order)
Definition stdatomic.h:302
#define _Atomic(T)
Definition stdatomic.h:77
Definition log.h:93
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition talloc.h:253
#define talloc_strdup(_ctx, _str)
Definition talloc.h:142
void fr_time_elapsed_update(fr_time_elapsed_t *elapsed, fr_time_t start, fr_time_t end)
Definition time.c:547
void fr_time_elapsed_fprint(FILE *fp, fr_time_elapsed_t const *elapsed, char const *prefix, int tab_offset)
Definition time.c:592
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition time.h:575
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
Definition time.h:154
#define fr_time_delta_lt(_a, _b)
Definition time.h:285
static fr_time_delta_t fr_time_delta_from_sec(int64_t sec)
Definition time.h:590
#define fr_time_delta_wrap(_time)
Definition time.h:152
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_eq(_a, _b)
Definition time.h:241
#define NSEC
Definition time.h:379
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
static fr_time_delta_t fr_time_delta_div(fr_time_delta_t a, fr_time_delta_t b)
Definition time.h:267
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition time.h:916
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
@ FR_TIME_TRACKING_YIELDED
We're currently tracking time in the yielded state.
static void fr_time_tracking_yield(fr_time_tracking_t *tt, fr_time_t now)
Transition to the yielded state, recording the time we just spent running.
static void fr_time_tracking_end(fr_time_delta_t *predicted, fr_time_tracking_t *tt, fr_time_t now)
End time tracking for this entity.
fr_time_delta_t waiting_total
total time spent waiting
fr_time_delta_t running_total
total time spent running
static void fr_time_tracking_start(fr_time_tracking_t *parent, fr_time_tracking_t *tt, fr_time_t now)
Start time tracking for a tracked entity.
static void fr_time_tracking_resume(fr_time_tracking_t *tt, fr_time_t now)
Track that a request resumed.
static void fr_time_tracking_debug(fr_time_tracking_t *tt, FILE *fp)
Print debug information about the time tracking structure.
uint64_t fr_timer_list_num_events(fr_timer_list_t *tl)
Return number of pending events.
Definition timer.c:1148
fr_timer_list_t * fr_timer_list_ordered_alloc(TALLOC_CTX *ctx, fr_timer_list_t *parent)
Allocate a new sorted event timer list.
Definition timer.c:1290
int fr_timer_list_force_run(fr_timer_list_t *tl)
Forcibly run all events in an event loop.
Definition timer.c:920
An event timer list.
Definition timer.c:49
#define fr_timer_in(...)
Definition timer.h:87
static bool fr_timer_armed(fr_timer_t *ev)
Definition timer.h:120
static fr_event_list_t * el
int unlang_thread_instantiate(TALLOC_CTX *ctx)
Create thread-specific data structures for unlang.
Definition compile.c:2295
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
Definition strerror.c:732
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1340
fr_heap_t * runnable
current runnable requests which we've spent time processing
Definition worker.c:109
static void worker_request_time_tracking_end(fr_worker_t *worker, request_t *request, fr_time_t now)
Definition worker.c:569
static void _worker_request_yield(request_t *request, UNUSED void *uctx)
Interpreter yielded request.
Definition worker.c:1252
static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
Handle a control plane message sent to the worker via a channel.
Definition worker.c:243
fr_event_list_t * el
our event list
Definition worker.c:105
int fr_worker_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
Pre-event handler.
Definition worker.c:1562
static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now)
Send a response packet to the network side.
Definition worker.c:586
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
Definition worker.c:1622
fr_rb_tree_t * listeners
so we can cancel requests when a listener goes away
Definition worker.c:116
static void worker_run_request(fr_worker_t *worker, fr_time_t start)
Run a request.
Definition worker.c:1301
static void worker_exit(fr_worker_t *worker)
Definition worker.c:221
#define WORKER_VERIFY
Definition worker.c:67
bool was_sleeping
used to suppress multiple sleep signals in a row
Definition worker.c:128
static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
Definition worker.c:1715
static void _worker_request_runnable(request_t *request, void *uctx)
Request is now runnable.
Definition worker.c:1241
static char * itoa_internal(TALLOC_CTX *ctx, uint64_t number)
Definition worker.c:711
fr_worker_t * fr_worker_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config)
Create a worker.
Definition worker.c:1350
static int8_t worker_dedup_cmp(void const *one, void const *two)
Track a request_t in the "dedup" tree.
Definition worker.c:987
fr_worker_channel_t * channel
list of channels
Definition worker.c:131
char const * name
name of this worker
Definition worker.c:91
uint64_t num_active
number of active requests
Definition worker.c:123
fr_cmd_table_t cmd_worker_table[]
Definition worker.c:1753
static int worker_request_time_tracking_start(fr_worker_t *worker, request_t *request, fr_time_t now)
Start time tracking for a request, and mark it as runnable.
Definition worker.c:539
int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
Definition worker.c:1698
static int _worker_request_deinit(request_t *request, UNUSED void *uctx)
Definition worker.c:764
static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
Detached request (i.e.
Definition worker.c:1176
static void _worker_request_resume(request_t *request, UNUSED void *uctx)
Interpreter is starting to work on request again.
Definition worker.c:1261
fr_rb_tree_t * dedup
de-dup tree
Definition worker.c:114
fr_atomic_queue_t * aq_control
atomic queue for control messages sent to me
Definition worker.c:101
static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Send a NAK to the network thread.
Definition worker.c:409
static void worker_request_name_number(request_t *request)
Definition worker.c:751
static void _worker_request_timeout(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t when, void *uctx)
Enforce max_request_time.
Definition worker.c:519
static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Definition worker.c:769
fr_log_t const * log
log destination
Definition worker.c:98
fr_io_stats_t stats
input / output stats
Definition worker.c:118
#define CHECK_CONFIG(_x, _min, _max)
static void _worker_request_detach(request_t *request, void *uctx)
Make us responsible for running the request.
Definition worker.c:1209
static int _fr_worker_rb_free(void *arg)
Definition worker.c:161
fr_time_tracking_t tracking
how much time the worker has spent doing things.
Definition worker.c:126
static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
External request is now complete.
Definition worker.c:1090
void fr_worker_destroy(fr_worker_t *worker)
Destroy a worker.
Definition worker.c:1010
uint64_t num_naks
number of messages which were nak'd
Definition worker.c:122
static void worker_request_init(fr_worker_t *worker, request_t *request, fr_time_t now)
Initialize various request fields needed by the worker.
Definition worker.c:734
fr_worker_config_t config
external configuration
Definition worker.c:92
fr_listen_t const * listener
incoming packets
Definition worker.c:137
unlang_interpret_t * intp
Worker's local interpreter.
Definition worker.c:94
static int fr_worker_listen_cancel_self(fr_worker_t *worker, fr_listen_t const *li)
Definition worker.c:360
static void worker_stop_request(request_t *request)
Signal the unlang interpreter that it needs to stop running the request.
Definition worker.c:500
static void _worker_request_prioritise(request_t *request, void *uctx)
Update a request's priority.
Definition worker.c:1278
bool exiting
are we exiting?
Definition worker.c:129
fr_log_lvl_t lvl
log level
Definition worker.c:99
static void worker_requests_cancel(fr_worker_channel_t *ch)
Definition worker.c:212
int num_channels
actual number of channels
Definition worker.c:107
fr_time_delta_t max_request_time
maximum time a request can be processed
Definition worker.c:112
fr_time_elapsed_t cpu_time
histogram of total CPU time per request
Definition worker.c:119
fr_rb_node_t node
in tree of listeners
Definition worker.c:139
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
Definition worker.c:1649
void fr_worker_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Post-event handler.
Definition worker.c:1583
fr_dlist_head_t dlist
of requests associated with this listener.
Definition worker.c:145
void fr_worker(fr_worker_t *worker)
The main loop and entry point of the stand-alone worker thread.
Definition worker.c:1504
request_slab_list_t * slab
slab allocator for request_t
Definition worker.c:133
static uint32_t worker_num_requests(fr_worker_t *worker)
Definition worker.c:759
fr_time_delta_t predicted
How long we predict a request will take to execute.
Definition worker.c:125
pthread_t thread_id
my thread ID
Definition worker.c:96
static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Callback which handles a message being received on the worker side.
Definition worker.c:202
fr_time_elapsed_t wall_clock
histogram of wall clock time per request
Definition worker.c:120
static int8_t worker_listener_cmp(void const *one, void const *two)
Definition worker.c:149
static bool is_worker_thread(fr_worker_t const *worker)
Definition worker.c:188
fr_worker_channel_t
Definition worker.c:85
fr_timer_list_t * timeout
Track when requests timeout using a dlist.
Definition worker.c:111
static fr_ring_buffer_t * fr_worker_rb_init(void)
Initialise thread local storage.
Definition worker.c:170
fr_control_t * control
the control plane
Definition worker.c:103
static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
Check if a request is scheduled.
Definition worker.c:1270
static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Internal request (i.e.
Definition worker.c:1160
void fr_worker_debug(fr_worker_t *worker, FILE *fp)
Print debug information about the worker structure.
Definition worker.c:1595
static void _worker_request_internal_init(request_t *request, void *uctx)
Internal request (i.e.
Definition worker.c:1069
#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max)
static int8_t worker_runnable_cmp(void const *one, void const *two)
Track a request_t in the "runnable" heap.
Definition worker.c:961
static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
A socket is going away, so clean up any requests which use this socket.
Definition worker.c:387
A worker which takes packets from a master, and processes them.
Definition worker.c:90
int message_set_size
default start number of messages
Definition worker.h:73
#define FR_CONTROL_ID_LISTEN_DEAD
Definition worker.h:40
int max_requests
max requests this worker will handle
Definition worker.h:69
int max_channels
maximum number of channels
Definition worker.h:71
fr_slab_config_t reuse
slab allocator configuration
Definition worker.h:78
int ring_buffer_size
default start size for the ring buffers
Definition worker.h:74
fr_time_delta_t max_request_time
maximum time a request can be processed
Definition worker.h:76