The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
worker.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 /**
18 * $Id: cfd8186f5e1a6ef01ba3c22e2c22c916f0a97546 $
19 *
20 * @brief Worker thread functions.
21 * @file io/worker.c
22 *
23 * The "worker" thread is the one responsible for the bulk of the
24 * work done when processing a request. Workers are spawned by the
25 * scheduler, and create a kqueue (KQ) and control-plane
26 * Atomic Queue (AQ) for control-plane communication.
27 *
28 * When a network thread discovers that it needs more workers, it
29 * asks the scheduler for a KQ/AQ combination. The network thread
30 * then creates a channel dedicated to that worker, and sends the
31 * channel to the worker in a "new channel" message. The worker
32 * receives the channel, and sends an ACK back to the network thread.
33 *
34 * The network thread then sends the worker new packets, which the
35 * worker receives and processes.
36 *
37 * When a packet is decoded, it is put into the "runnable" heap, and
38 * also into the timeout sublist. The main loop fr_worker() then
39 * pulls new requests off of this heap and runs them. The main event
40 * loop checks the head of the timeout sublist, and forcefully terminates
41 * any requests which have been running for too long.
42 *
43 * If a request is yielded, it is placed onto the yielded list in
44 * the worker "tracking" data structure.
45 *
46 * @copyright 2016 Alan DeKok (aland@freeradius.org)
47 */
48
49RCSID("$Id: cfd8186f5e1a6ef01ba3c22e2c22c916f0a97546 $")
50
51#define LOG_PREFIX worker->name
52#define LOG_DST worker->log
53
54#include <freeradius-devel/io/channel.h>
55#include <freeradius-devel/io/listen.h>
56#include <freeradius-devel/io/message.h>
57#include <freeradius-devel/io/worker.h>
58#include <freeradius-devel/unlang/base.h>
59#include <freeradius-devel/unlang/call.h>
60#include <freeradius-devel/unlang/interpret.h>
61#include <freeradius-devel/server/request.h>
62#include <freeradius-devel/server/time_tracking.h>
63#include <freeradius-devel/util/dlist.h>
64#include <freeradius-devel/util/minmax_heap.h>
65#include <freeradius-devel/util/slab.h>
66#include <freeradius-devel/util/time.h>
67#include <freeradius-devel/util/timer.h>
68
69#include <stdalign.h>
70
71#ifdef WITH_VERIFY_PTR
72static void worker_verify(fr_worker_t *worker);
73#define WORKER_VERIFY worker_verify(worker)
74#else
75#define WORKER_VERIFY
76#endif
77
78#define CACHE_LINE_SIZE 64
79static _Atomic(uint64_t) request_number = 0;
80
83
84static _Thread_local fr_ring_buffer_t *fr_worker_rb;
85
86typedef struct {
87 fr_channel_t *ch;
88
89 /*
90 * To save time, we don't care about num_elements here. Which means that we don't
91 * need to cache or lookup the fr_worker_listen_t when we free a request.
92 */
93 fr_dlist_head_t dlist;
95
96/**
97 * A worker which takes packets from a master, and processes them.
98 */
100 char const *name; //!< name of this worker
101 fr_worker_config_t config; //!< external configuration
102
103 unlang_interpret_t *intp; //!< Worker's local interpreter.
104
105 pthread_t thread_id; //!< my thread ID
106
107 fr_log_t const *log; //!< log destination
108 fr_log_lvl_t lvl; //!< log level
109
110 fr_atomic_queue_t *aq_control; //!< atomic queue for control messages sent to me
111
112 fr_control_t *control; //!< the control plane
113
114 fr_event_list_t *el; //!< our event list
115
116 int num_channels; //!< actual number of channels
117
118 fr_heap_t *runnable; //!< current runnable requests which we've spent time processing
119
120 fr_timer_list_t *timeout; //!< Track when requests timeout using a dlist.
121 fr_time_delta_t max_request_time; //!< maximum time a request can be processed
122
123 fr_rb_tree_t *dedup; //!< de-dup tree
124
125 fr_rb_tree_t *listeners; //!< so we can cancel requests when a listener goes away
126
127 fr_io_stats_t stats; //!< input / output stats
128 fr_time_elapsed_t cpu_time; //!< histogram of total CPU time per request
129 fr_time_elapsed_t wall_clock; //!< histogram of wall clock time per request
130
131 uint64_t num_naks; //!< number of messages which were nak'd
132 uint64_t num_active; //!< number of active requests
133
134 fr_time_delta_t predicted; //!< How long we predict a request will take to execute.
135 fr_time_tracking_t tracking; //!< how much time the worker has spent doing things.
136
137 bool was_sleeping; //!< used to suppress multiple sleep signals in a row
138 bool exiting; //!< are we exiting?
139
140 fr_worker_channel_t *channel; //!< list of channels
141
142 request_slab_list_t *slab; //!< slab allocator for request_t
143};
144
145typedef struct {
146 fr_listen_t const *listener; //!< incoming packets
147
148 fr_rb_node_t node; //!< in tree of listeners
149
150 /*
151 * To save time, we don't care about num_elements here. Which means that we don't
152 * need to cache or lookup the fr_worker_listen_t when we free a request.
153 */
154 fr_dlist_head_t dlist; //!< of requests associated with this listener.
156
157
158static int8_t worker_listener_cmp(void const *one, void const *two)
159{
160 fr_worker_listen_t const *a = one, *b = two;
161
162 return CMP(a->listener, b->listener);
163}
164
165
166/*
167 * Explicitly cleanup the memory allocated to the ring buffer,
168 * just in case valgrind complains about it.
169 */
170static int _fr_worker_rb_free(void *arg)
171{
172 return talloc_free(arg);
173}
174
175/** Initialise thread local storage
176 *
177 * @return fr_ring_buffer_t for messages
178 */
180{
182
183 rb = fr_worker_rb;
184 if (rb) return rb;
185
187 if (!rb) {
188 fr_perror("Failed allocating memory for worker ring buffer");
189 return NULL;
190 }
191
193
194 return rb;
195}
196
197static inline bool is_worker_thread(fr_worker_t const *worker)
198{
199 return (pthread_equal(pthread_self(), worker->thread_id) != 0);
200}
201
203static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now);
204
205/** Callback which handles a message being received on the worker side.
206 *
207 * @param[in] ctx the worker
208 * @param[in] ch the channel to drain
209 * @param[in] cd the message (if any) to start with
210 */
211static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
212{
213 fr_worker_t *worker = ctx;
214
215 worker->stats.in++;
216 DEBUG3("Received request %" PRIu64 "", worker->stats.in);
217 cd->channel.ch = ch;
218 worker_request_bootstrap(worker, cd, fr_time());
219}
220
222{
223 request_t *request;
224
225 while ((request = fr_dlist_pop_head(&ch->dlist)) != NULL) {
227 }
228}
229
230static void worker_exit(fr_worker_t *worker)
231{
232 worker->exiting = true;
233
234 /*
235 * Don't allow the post event to run
236 * any more requests. They'll be
237 * signalled to stop before we exit.
238 *
239 * This only has an effect in single
240 * threaded mode.
241 */
242 (void)fr_event_post_delete(worker->el, fr_worker_post_event, worker);
243}
244
245/** Handle a control plane message sent to the worker via a channel
246 *
247 * @param[in] ctx the worker
248 * @param[in] data the message
249 * @param[in] data_size size of the data
250 * @param[in] now the current time
251 */
252static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
253{
254 int i;
255 bool ok, was_sleeping;
256 fr_channel_t *ch;
259 fr_worker_t *worker = ctx;
260
261 was_sleeping = worker->was_sleeping;
262 worker->was_sleeping = false;
263
264 /*
265 * We were woken up by a signal to do something. We're
266 * not sleeping.
267 */
268 ce = fr_channel_service_message(now, &ch, data, data_size);
269 DEBUG3("Channel %s",
270 fr_table_str_by_value(channel_signals, ce, "<INVALID>"));
271 switch (ce) {
272 case FR_CHANNEL_ERROR:
273 return;
274
275 case FR_CHANNEL_EMPTY:
276 return;
277
278 case FR_CHANNEL_NOOP:
279 return;
280
282 fr_assert(0 == 1);
283 break;
284
286 fr_assert(ch != NULL);
287
288 if (!fr_channel_recv_request(ch)) {
289 worker->was_sleeping = was_sleeping;
290
291 } else while (fr_channel_recv_request(ch));
292 break;
293
294 case FR_CHANNEL_OPEN:
295 fr_assert(ch != NULL);
296
297 ok = false;
298 for (i = 0; i < worker->config.max_channels; i++) {
299 fr_assert(worker->channel[i].ch != ch);
300
301 if (worker->channel[i].ch != NULL) continue;
302
303 worker->channel[i].ch = ch;
304 fr_dlist_init(&worker->channel[i].dlist, fr_async_t, entry);
305
306 DEBUG3("Received channel %p into array entry %d", ch, i);
307
308 ms = fr_message_set_create(worker, worker->config.message_set_size,
309 sizeof(fr_channel_data_t),
310 worker->config.ring_buffer_size, false);
311 fr_assert(ms != NULL);
313
314 worker->num_channels++;
315 ok = true;
316 break;
317 }
318
319 fr_cond_assert(ok);
320 break;
321
322 case FR_CHANNEL_CLOSE:
323 fr_assert(ch != NULL);
324
325 ok = false;
326
327 /*
328 * Locate the signalling channel in the list
329 * of channels.
330 */
331 for (i = 0; i < worker->config.max_channels; i++) {
332 if (!worker->channel[i].ch) continue;
333
334 if (worker->channel[i].ch != ch) continue;
335
336 worker_requests_cancel(&worker->channel[i]);
337
339
340 fr_assert_msg(fr_dlist_num_elements(&worker->channel[i].dlist) == 0,
341 "Network added messages to channel after sending FR_CHANNEL_CLOSE");
342
344 fr_assert(ms != NULL);
346 talloc_free(ms);
347
348 worker->channel[i].ch = NULL;
349
350 fr_assert(!fr_dlist_head(&worker->channel[i].dlist)); /* we can't look at num_elements */
351 fr_assert(worker->num_channels > 0);
352
353 worker->num_channels--;
354 ok = true;
355 break;
356 }
357
358 fr_cond_assert(ok);
359
360 /*
361 * Our last input channel closed,
362 * time to die.
363 */
364 if (worker->num_channels == 0) worker_exit(worker);
365 break;
366 }
367}
368
370{
372 request_t *request;
373
374 wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = li });
375 if (!wl) return -1;
376
377 while ((request = fr_dlist_pop_head(&wl->dlist)) != NULL) {
378 RDEBUG("Canceling request due to socket being closed");
380 }
381
382 (void) fr_rb_delete(worker->listeners, wl);
383 talloc_free(wl);
384
385 return 0;
386}
387
388
389/** A socket is going away, so clean up any requests which use this socket.
390 *
391 * @param[in] ctx the worker
392 * @param[in] data the message
393 * @param[in] data_size size of the data
394 * @param[in] now the current time
395 */
396static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
397{
398 fr_listen_t const *li;
399 fr_worker_t *worker = ctx;
400
401 fr_assert(data_size == sizeof(li));
402
403 memcpy(&li, data, sizeof(li));
404
405 (void) fr_worker_listen_cancel_self(worker, li);
406}
407
408/** Send a NAK to the network thread
409 *
410 * The network thread believes that a worker is running a request until that request has been NAK'd.
411 * We typically NAK requests when they've been hanging around in the worker's backlog too long,
412 * or there was an error executing the request.
413 *
414 * @param[in] worker the worker
415 * @param[in] cd the message to NAK
416 * @param[in] now when the message is NAKd
417 */
418static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
419{
420 size_t size;
421 fr_channel_data_t *reply;
422 fr_channel_t *ch;
424 fr_listen_t *listen;
425
426 worker->num_naks++;
427
428 /*
429 * Cache the outbound channel. We'll need it later.
430 */
431 ch = cd->channel.ch;
432 listen = cd->listen;
433
434 /*
435 * If the channel has been closed, but we haven't
436 * been informed, that is extremely bad.
437 *
438 * Try to continue working... but we'll likely
439 * leak memory or SEGV soon.
440 */
441 if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send NAK but channel has been closed")) {
442 fr_message_done(&cd->m);
443 return;
444 }
445
447 fr_assert(ms != NULL);
448
449 size = listen->app_io->default_reply_size;
450 if (!size) size = listen->app_io->default_message_size;
451
452 /*
453 * Allocate a default message size.
454 */
455 MEM(reply = (fr_channel_data_t *) fr_message_reserve(ms, size));
456
457 /*
458 * Encode a NAK
459 */
460 if (listen->app_io->nak) {
461 size = listen->app_io->nak(listen, cd->packet_ctx, cd->m.data,
462 cd->m.data_size, reply->m.data, reply->m.rb_size);
463 } else {
464 size = 1; /* rely on them to figure it the heck out */
465 }
466
467 (void) fr_message_alloc(ms, &reply->m, size);
468
469 /*
470 * Fill in the NAK.
471 */
472 reply->m.when = now;
473 reply->reply.cpu_time = worker->tracking.running_total;
474 reply->reply.processing_time = fr_time_delta_from_msec(1); /* @todo - set to something better? */
475 reply->reply.request_time = cd->request.recv_time;
476
477 reply->listen = cd->listen;
478 reply->packet_ctx = cd->packet_ctx;
479
480 /*
481 * Mark the original message as done.
482 */
483 fr_message_done(&cd->m);
484
485 /*
486 * Send the reply, which also polls the request queue.
487 */
488 if (fr_channel_send_reply(ch, reply) < 0) {
489 DEBUG2("Failed sending reply to channel");
490 }
491
492 worker->stats.out++;
493}
494
495/** Signal the unlang interpreter that it needs to stop running the request
496 *
497 * Signalling is a synchronous operation. Whatever I/O requests the request
498 * is currently performing are immediately cancelled, and all the frames are
499 * popped off the unlang stack.
500 *
501 * Modules and unlang keywords explicitly register signal handlers to deal
502 * with their yield points being cancelled/interrupted via this function.
503 *
504 * The caller should assume the request is no longer viable after calling
505 * this function.
506 *
507 * @param[in] request request to cancel. The request may still run to completion.
508 */
509static void worker_stop_request(request_t *request)
510{
511 /*
512 * Also marks the request as done and runs
513 * the internal/external callbacs.
514 */
516}
517
518/** Enforce max_request_time
519 *
520 * Run periodically, and tries to clean up requests which were received by the network
521 * thread more than max_request_time seconds ago. In the interest of not adding a
522 * timer for every packet, the requests are given a 1 second leeway.
523 *
524 * @param[in] tl the worker's timer list.
525 * @param[in] when the current time
526 * @param[in] uctx the request_t timing out.
527 */
529{
530 request_t *request = talloc_get_type_abort(uctx, request_t);
531
532 /*
533 * Waiting too long, delete it.
534 */
535 REDEBUG("Request has reached max_request_time - signalling it to stop");
536 worker_stop_request(request);
537
538 /*
539 * This ensures the finally section can run timeout specific policies
540 */
541 request->rcode = RLM_MODULE_TIMEOUT;
542}
543
544
545/** Start time tracking for a request, and mark it as runnable.
546 *
547 */
549{
550 /*
551 * New requests are inserted into the time order heap in
552 * strict time priority. Once they are in the list, they
553 * are only removed when the request is done / free'd.
554 */
555 fr_assert(!fr_timer_armed(request->timeout));
556
557 if (unlikely(fr_timer_in(request, worker->timeout, &request->timeout, worker->config.max_request_time,
558 true, _worker_request_timeout, request) < 0)) {
559 RERROR("Failed to set request timeout timer");
560 return -1;
561 }
562
563 /*
564 * Bootstrap the async state machine with the initial
565 * state of the request.
566 */
567 RDEBUG3("Time tracking started in yielded state");
568 fr_time_tracking_start(&worker->tracking, &request->async->tracking, now);
569 fr_time_tracking_yield(&request->async->tracking, now);
570 worker->num_active++;
571
572 fr_assert(!fr_heap_entry_inserted(request->runnable));
573 (void) fr_heap_insert(&worker->runnable, request);
574
575 return 0;
576}
577
579{
580 RDEBUG3("Time tracking ended");
581 fr_time_tracking_end(&worker->predicted, &request->async->tracking, now);
582 fr_assert(worker->num_active > 0);
583 worker->num_active--;
584
585 TALLOC_FREE(request->timeout); /* Disarm the reques timer */
586}
587
588/** Send a response packet to the network side
589 *
590 * @param[in] worker This worker.
591 * @param[in] request we're sending a reply for.
592 * @param[in] send_reply whether the network side sends a reply
593 * @param[in] now The current time
594 */
595static void worker_send_reply(fr_worker_t *worker, request_t *request, bool send_reply, fr_time_t now)
596{
597 fr_channel_data_t *reply;
598 fr_channel_t *ch;
600 size_t size = 1;
601
602 REQUEST_VERIFY(request);
603
604 /*
605 * If we're sending a reply, then it's no longer runnable.
606 */
607 fr_assert(!fr_heap_entry_inserted(request->runnable));
608
609 if (send_reply) {
610 size = request->async->listen->app_io->default_reply_size;
611 if (!size) size = request->async->listen->app_io->default_message_size;
612 }
613
614 /*
615 * Allocate and send the reply.
616 */
617 ch = request->async->channel;
618 fr_assert(ch != NULL);
619
620 /*
621 * If the channel has been closed, but we haven't
622 * been informed, that is extremely bad.
623 *
624 * Try to continue working... but we'll likely
625 * leak memory or SEGV soon.
626 */
627 if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send reply but channel has been closed")) {
628 return;
629 }
630
632 fr_assert(ms != NULL);
633
634 reply = (fr_channel_data_t *) fr_message_reserve(ms, size);
635 fr_assert(reply != NULL);
636
637 /*
638 * Encode it, if required.
639 */
640 if (send_reply) {
641 ssize_t slen = 0;
642 fr_listen_t const *listen = request->async->listen;
643
644 if (listen->app_io->encode) {
645 slen = listen->app_io->encode(listen->app_io_instance, request,
646 reply->m.data, reply->m.rb_size);
647 } else if (listen->app->encode) {
648 slen = listen->app->encode(listen->app_instance, request,
649 reply->m.data, reply->m.rb_size);
650 }
651 if (slen < 0) {
652 RPERROR("Failed encoding request");
653 *reply->m.data = 0;
654 slen = 1;
655 }
656
657 /*
658 * Shrink the buffer to the actual packet size.
659 *
660 * This will ALWAYS return the same message as we put in.
661 */
662 fr_assert((size_t) slen <= reply->m.rb_size);
663 (void) fr_message_alloc(ms, &reply->m, slen);
664 }
665
666 /*
667 * Fill in the rest of the fields in the channel message.
668 *
669 * sequence / ack will be filled in by fr_channel_send_reply()
670 */
671 reply->m.when = now;
672 reply->reply.cpu_time = worker->tracking.running_total;
673 reply->reply.processing_time = request->async->tracking.running_total;
674 reply->reply.request_time = request->async->recv_time;
675
676 reply->listen = request->async->listen;
677 reply->packet_ctx = request->async->packet_ctx;
678
679 /*
680 * Update the various timers.
681 */
682 fr_time_elapsed_update(&worker->cpu_time, now, fr_time_add(now, reply->reply.processing_time));
683 fr_time_elapsed_update(&worker->wall_clock, reply->reply.request_time, now);
684
685 RDEBUG("Finished request");
686
687 /*
688 * Send the reply, which also polls the request queue.
689 */
690 if (fr_channel_send_reply(ch, reply) < 0) {
691 /*
692 * Should only happen if the TO_REQUESTOR
693 * channel is full, or it's not yet active.
694 *
695 * Not much we can do except complain
696 * loudly and cleanup the request.
697 */
698 RPERROR("Failed sending reply to network thread");
699 }
700
701 worker->stats.out++;
702
703 fr_assert(!fr_timer_armed(request->timeout));
704 fr_assert(!fr_heap_entry_inserted(request->runnable));
705
706 fr_dlist_entry_unlink(&request->listen_entry);
707
708#ifndef NDEBUG
709 request->async->el = NULL;
710 request->async->channel = NULL;
711 request->async->packet_ctx = NULL;
712 request->async->listen = NULL;
713#endif
714}
715
716/*
717 * talloc_typed_asprintf() is horrifically slow for printing
718 * simple numbers.
719 */
720static char *itoa_internal(TALLOC_CTX *ctx, uint64_t number)
721{
722 char buffer[32];
723 char *p;
724 char const *numbers = "0123456789";
725
726 p = buffer + 30;
727 *(p--) = '\0';
728
729 while (number > 0) {
730 *(p--) = numbers[number % 10];
731 number /= 10;
732 }
733
734 if (p[1]) return talloc_strdup(ctx, p + 1);
735
736 return talloc_strdup(ctx, "0");
737}
738
739/** Initialize various request fields needed by the worker.
740 *
741 */
742static inline CC_HINT(always_inline)
744{
745 /*
746 * For internal requests request->packet
747 * and request->reply are already populated.
748 */
749 if (!request->packet) MEM(request->packet = fr_packet_alloc(request, false));
750 if (!request->reply) MEM(request->reply = fr_packet_alloc(request, false));
751
752 request->packet->timestamp = now;
753 request->async = talloc_zero(request, fr_async_t);
754 request->async->recv_time = now;
755 request->async->el = worker->el;
756 fr_dlist_entry_init(&request->async->entry);
757}
758
759static inline CC_HINT(always_inline)
761{
762 request->number = atomic_fetch_add_explicit(&request_number, 1, memory_order_seq_cst);
763 if (request->name) talloc_const_free(request->name);
764 request->name = itoa_internal(request, request->number);
765}
766
767static inline CC_HINT(always_inline)
769{
770 return fr_timer_list_num_events(worker->timeout);
771}
772
773static int _worker_request_deinit(request_t *request, UNUSED void *uctx)
774{
775 return request_slab_deinit(request);
776}
777
779{
780 int ret = -1;
781 request_t *request;
782 fr_listen_t *listen = cd->listen;
783
784 if (worker_num_requests(worker) >= (uint32_t) worker->config.max_requests) {
785 RATE_LIMIT_GLOBAL(ERROR, "Worker at max requests");
786 goto nak;
787 }
788
789 /*
790 * Receive a message to the worker queue, and decode it
791 * to a request.
792 */
793 fr_assert(listen != NULL);
794
795 request = request_slab_reserve(worker->slab);
796 if (!request) {
797 RATE_LIMIT_GLOBAL(ERROR, "Worker failed allocating new request");
798 goto nak;
799 }
800 /*
801 * Ensures that both the deinit function runs AND
802 * the request is returned to the slab if something
803 * calls talloc_free() on it.
804 */
805 request_slab_element_set_destructor(request, _worker_request_deinit, worker);
806
807 /*
808 * Have to initialise the request manually because namspace
809 * changes based on the listener that allocated it.
810 */
811 if (request_init(request, REQUEST_TYPE_EXTERNAL, (&(request_init_args_t){ .namespace = listen->dict })) < 0) {
812 request_slab_release(request);
813 goto nak;
814 }
815
816 /*
817 * Do normal worker init that's shared between internal
818 * and external requests.
819 */
820 worker_request_init(worker, request, now);
822
823 /*
824 * Associate our interpreter with the request
825 */
826 unlang_interpret_set(request, worker->intp);
827
828 request->packet->timestamp = cd->request.recv_time; /* Legacy - Remove once everything looks at request->async */
829
830 /*
831 * Update the transport-specific fields.
832 */
833 request->async->channel = cd->channel.ch;
834
835 request->async->recv_time = cd->request.recv_time;
836
837 request->async->listen = listen;
838 request->async->packet_ctx = cd->packet_ctx;
839 request->priority = cd->priority;
840
841 /*
842 * Now that the "request" structure has been initialized, go decode the packet.
843 *
844 * Note that this also sets the "async process" function.
845 */
846 if (listen->app->decode) {
847 ret = listen->app->decode(listen->app_instance, request, cd->m.data, cd->m.data_size);
848 } else if (listen->app_io->decode) {
849 ret = listen->app_io->decode(listen->app_io_instance, request, cd->m.data, cd->m.data_size);
850 }
851
852 if (ret < 0) {
853 fail:
854 request_slab_release(request);
855
856 nak:
857 worker_nak(worker, cd, now);
858 return;
859 }
860
861 /*
862 * Set the entry point for this virtual server.
863 */
864 if (unlang_call_push(NULL, request, cd->listen->server_cs, UNLANG_TOP_FRAME) < 0) {
865 RERROR("Protocol failed to set 'process' function");
866 goto fail;
867 }
868
869 /*
870 * Look for conflicting / duplicate packets, but only if
871 * requested to do so.
872 */
873 if (request->async->listen->track_duplicates) {
874 request_t *old;
875
876 old = fr_rb_find(worker->dedup, request);
877 if (!old) {
878 goto insert_new;
879 }
880
881 fr_assert(old->async->listen == request->async->listen);
882 fr_assert(old->async->channel == request->async->channel);
883
884 /*
885 * There's a new packet. Do we keep the old one,
886 * or the new one? This decision is made by
887 * checking the recv_time, which is a
888 * nanosecond-resolution timer. If the time is
889 * identical, then the new packet is the same as
890 * the old one.
891 *
892 * If the new packet is a duplicate of the old
893 * one, then we can just discard the new one. We
894 * have to tell the channel that we've "eaten"
895 * this reply, so the sequence number should
896 * increase.
897 *
898 * @todo - fix the channel code to do queue
899 * depth, and not sequence / ack.
900 */
901 if (fr_time_eq(old->async->recv_time, request->async->recv_time)) {
902 RWARN("Discarding duplicate of request (%"PRIu64")", old->number);
903
904 fr_channel_null_reply(request->async->channel);
905 request_slab_release(request);
906
907 /*
908 * Signal there's a dup, and ignore the
909 * return code. We don't bother replying
910 * here, as an FD event or timer will
911 * wake up the request, and cause it to
912 * continue.
913 *
914 * @todo - the old request is NOT
915 * running, but is yielded. It MAY clean
916 * itself up, or do something...
917 */
919 worker->stats.dup++;
920
921 fr_message_done(&cd->m);
922 return;
923 }
924
925 /*
926 * Stop the old request, and decrement the number
927 * of active requests.
928 */
929 RWARN("Got conflicting packet for request (%" PRIu64 "), telling old request to stop", old->number);
930
932 worker->stats.dropped++;
933
934 insert_new:
935 (void) fr_rb_insert(worker->dedup, request);
936 }
937
938 if (worker_request_time_tracking_start(worker, request, now) < 0) {
939 if (request->async->listen->track_duplicates) (void) fr_rb_remove(worker->dedup, request);
940 goto fail;
941 }
942
943 /*
944 * We're done with this message.
945 */
946 fr_message_done(&cd->m);
947
948 {
950
951 wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = listen });
952 if (!wl) {
953 MEM(wl = talloc_zero(worker, fr_worker_listen_t));
954 fr_dlist_init(&wl->dlist, request_t, listen_entry);
955 wl->listener = listen;
956
957 (void) fr_rb_insert(worker->listeners, wl);
958 }
959
960 fr_dlist_insert_tail(&wl->dlist, request);
961 }
962}
963
964/**
965 * Track a request_t in the "runnable" heap.
966 * Higher priorities take precedence, followed by lower sequence numbers
967 */
968static int8_t worker_runnable_cmp(void const *one, void const *two)
969{
970 request_t const *a = one, *b = two;
971 int ret;
972
973 /*
974 * Prefer higher priority packets.
975 */
976 ret = CMP_PREFER_LARGER(b->priority, a->priority);
977 if (ret != 0) return ret;
978
979 /*
980 * Prefer packets which are further along in their processing sequence.
981 */
982 ret = CMP_PREFER_LARGER(a->sequence, b->sequence);
983 if (ret != 0) return ret;
984
985 /*
986 * Smaller timestamp (i.e. earlier) is more important.
987 */
988 return fr_time_cmp(a->async->recv_time, b->async->recv_time);
989}
990
991/**
992 * Track a request_t in the "dedup" tree
993 */
994static int8_t worker_dedup_cmp(void const *one, void const *two)
995{
996 int ret;
997 request_t const *a = one, *b = two;
998
999 ret = CMP(a->async->listen, b->async->listen);
1000 if (ret) return ret;
1001
1002 return CMP(a->async->packet_ctx, b->async->packet_ctx);
1003}
1004
1005/** Destroy a worker
1006 *
1007 * The input channels are signaled, and local messages are cleaned up.
1008 *
1009 * This should be called to _EXPLICITLY_ destroy a worker, when some fatal
1010 * error has occurred on the worker side, and we need to destroy it.
1011 *
1012 * We signal all pending requests in the backlog to stop, and tell the
1013 * network side that it should not send us any more requests.
1014 *
1015 * @param[in] worker the worker to destroy.
1016 */
1018{
1019 int i, count, ret;
1020
1021// WORKER_VERIFY;
1022
1023 /*
1024 * Stop any new requests running with this interpreter
1025 */
1027
1028 /*
1029 * Destroy all of the active requests. These are ones
1030 * which are still waiting for timers or file descriptor
1031 * events.
1032 */
1033 count = 0;
1034
1035 /*
1036 * Force the timeout event to fire for all requests that
1037 * are still running.
1038 */
1039 ret = fr_timer_list_force_run(worker->timeout);
1040 if (unlikely(ret < 0)) {
1041 fr_assert_msg(0, "Failed to force run the timeout list");
1042 } else {
1043 count += ret;
1044 }
1045
1047
1048 DEBUG("Worker is exiting - stopped %u requests", count);
1049
1050 /*
1051 * Signal the channels that we're closing.
1052 *
1053 * The other end owns the channel, and will take care of
1054 * popping messages in the TO_RESPONDER queue, and marking
1055 * them FR_MESSAGE_DONE. It will ignore the messages in
1056 * the TO_REQUESTOR queue, as we own those. They will be
1057 * automatically freed when our talloc context is freed.
1058 */
1059 for (i = 0; i < worker->config.max_channels; i++) {
1060 if (!worker->channel[i].ch) continue;
1061
1062 worker_requests_cancel(&worker->channel[i]);
1063
1064 fr_assert_msg(fr_dlist_num_elements(&worker->channel[i].dlist) == 0,
1065 "Pending messages in channel after cancelling request");
1066
1068 }
1069
1070 talloc_free(worker);
1071}
1072
1073/** Internal request (i.e. one generated by the interpreter) is now complete
1074 *
1075 */
1076static void _worker_request_internal_init(request_t *request, void *uctx)
1077{
1078 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1079 fr_time_t now = fr_time();
1080
1081 worker_request_init(worker, request, now);
1082
1083 /*
1084 * Requests generated by the interpreter
1085 * are always marked up as internal.
1086 */
1088 if (worker_request_time_tracking_start(worker, request, now) < 0) {
1090 }
1091}
1092
1093
1094/** External request is now complete
1095 *
1096 */
1097static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1098{
1099 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1100 fr_time_t now = fr_time();
1101
1102 /*
1103 * All external requests MUST have a listener.
1104 */
1106 fr_assert(request->async->listen != NULL);
1107
1108 /*
1109 * Only real packets are in the dedup tree. And even
1110 * then, only some of the time.
1111 */
1112 if (request->async->listen->track_duplicates) {
1113 (void) fr_rb_delete(worker->dedup, request);
1114 }
1115
1116 /*
1117 * If we're running a real request, then the final
1118 * indentation MUST be zero. Otherwise we skipped
1119 * something!
1120 *
1121 * Also check that the request is NOT marked as
1122 * "yielded", but is in fact done.
1123 *
1124 * @todo - check that the stack is at frame 0, otherwise
1125 * more things have gone wrong.
1126 */
1127 fr_assert_msg(request_is_internal(request) || request_is_detached(request) || (request->log.indent.unlang == 0),
1128 "Request %s bad log indentation - expected 0 got %u", request->name, request->log.indent.unlang);
1130 "Request %s is marked as yielded at end of processing", request->name);
1132 "Request %s stack depth %u > 0", request->name, unlang_interpret_stack_depth(request));
1133 RDEBUG("Done request");
1134
1135 /*
1136 * The request is done. Track that.
1137 */
1138 worker_request_time_tracking_end(worker, request, now);
1139
1140 /*
1141 * Remove it from the list of requests associated with this channel.
1142 */
1143 if (fr_dlist_entry_in_list(&request->async->entry)) {
1144 fr_dlist_entry_unlink(&request->async->entry);
1145 }
1146
1147 /*
1148 * These conditions are true when the server is
1149 * exiting and we're stopping all the requests.
1150 *
1151 * This should never happen otherwise.
1152 */
1153 if (unlikely(!fr_channel_active(request->async->channel))) {
1154 fr_dlist_entry_unlink(&request->listen_entry);
1155 request_slab_release(request);
1156 return;
1157 }
1158
1159 worker_send_reply(worker, request, !unlang_request_is_cancelled(request), now);
1160 request_slab_release(request);
1161}
1162
1163/** Internal request (i.e. one generated by the interpreter) is now complete
1164 *
1165 * Whatever generated the request is now responsible for freeing it.
1166 */
1167static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1168{
1169 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1170
1171 worker_request_time_tracking_end(worker, request, fr_time());
1172
1173 fr_assert(!fr_heap_entry_inserted(request->runnable));
1174 fr_assert(!fr_timer_armed(request->timeout));
1175 fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1176}
1177
1178/** Detached request (i.e. one generated by the interpreter with no parent) is now complete
1179 *
1180 * As the request has no parent, then there's nothing to free it
1181 * so we have to.
1182 */
1183static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
1184{
1185 /*
1186 * No time tracking for detached requests
1187 * so we don't need to call
1188 * worker_request_time_tracking_end.
1189 */
1190 fr_assert(!fr_heap_entry_inserted(request->runnable));
1191
1192 /*
1193 * Normally worker_request_time_tracking_end
1194 * would remove the request from the time
1195 * order heap, but we need to do that for
1196 * detached requests.
1197 */
1198 TALLOC_FREE(request->timeout);
1199
1200 fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1201
1202 /*
1203 * Detached requests have to be freed by us
1204 * as nothing else can free them.
1205 *
1206 * All other requests must be freed by the
1207 * code which allocated them.
1208 */
1209 talloc_free(request);
1210}
1211
1212
1213/** Make us responsible for running the request
1214 *
1215 */
1216static void _worker_request_detach(request_t *request, void *uctx)
1217{
1218 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1219 fr_time_t now = fr_time();
1220
1221 RDEBUG4("%s - Request detaching", __FUNCTION__);
1222
1223 if (request_is_detachable(request)) {
1224 /*
1225 * End the time tracking... We don't track detached requests,
1226 * because they don't contribute for the time consumed by an
1227 * external request.
1228 */
1229 if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) {
1230 RDEBUG3("Forcing time tracking to running state, from yielded, for request detach");
1231 fr_time_tracking_resume(&request->async->tracking, now);
1232 }
1233 worker_request_time_tracking_end(worker, request, now);
1234
1235 if (request_detach(request) < 0) RPEDEBUG("Failed detaching request");
1236
1237 RDEBUG3("Request is detached");
1238 } else {
1239 fr_assert_msg(0, "Request is not detachable");
1240 }
1241
1242 return;
1243}
1244
1245/** Request is now runnable
1246 *
1247 */
1248static void _worker_request_runnable(request_t *request, void *uctx)
1249{
1250 fr_worker_t *worker = uctx;
1251
1252 RDEBUG4("%s - Request marked as runnable", __FUNCTION__);
1253 fr_heap_insert(&worker->runnable, request);
1254}
1255
1256/** Interpreter yielded request
1257 *
1258 */
1259static void _worker_request_yield(request_t *request, UNUSED void *uctx)
1260{
1261 RDEBUG4("%s - Request yielded", __FUNCTION__);
1262 if (likely(!request_is_detached(request))) fr_time_tracking_yield(&request->async->tracking, fr_time());
1263}
1264
1265/** Interpreter is starting to work on request again
1266 *
1267 */
1268static void _worker_request_resume(request_t *request, UNUSED void *uctx)
1269{
1270 RDEBUG4("%s - Request resuming", __FUNCTION__);
1271 if (likely(!request_is_detached(request))) fr_time_tracking_resume(&request->async->tracking, fr_time());
1272}
1273
1274/** Check if a request is scheduled
1275 *
1276 */
1277static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
1278{
1279 return fr_heap_entry_inserted(request->runnable);
1280}
1281
1282/** Update a request's priority
1283 *
1284 */
1285static void _worker_request_prioritise(request_t *request, void *uctx)
1286{
1287 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1288
1289 RDEBUG4("%s - Request priority changed", __FUNCTION__);
1290
1291 /* Extract the request from the runnable queue _if_ it's in the runnable queue */
1292 if (fr_heap_extract(&worker->runnable, request) < 0) return;
1293
1294 /* Reinsert it to re-evaluate its new priority */
1295 fr_heap_insert(&worker->runnable, request);
1296}
1297
1298/** Run a request
1299 *
1300 * Until it either yields, or is done.
1301 *
1302 * This function is also responsible for sending replies, and
1303 * cleaning up the request.
1304 *
1305 * @param[in] worker the worker
1306 * @param[in] start the current time
1307 */
1308static inline CC_HINT(always_inline) void worker_run_request(fr_worker_t *worker, fr_time_t start)
1309{
1310 request_t *request;
1311 fr_time_t now;
1312
1314
1315 now = start;
1316
1317 /*
1318 * Busy-loop running requests for 1ms. We still poll the
1319 * event loop 1000 times a second, OR when there's no
1320 * more work to do. This allows us to make progress with
1321 * ongoing requests, at the expense of sometimes ignoring
1322 * new ones.
1323 */
1324 while (fr_time_delta_lt(fr_time_sub(now, start), fr_time_delta_from_msec(1)) &&
1325 ((request = fr_heap_pop(&worker->runnable)) != NULL)) {
1326
1327 REQUEST_VERIFY(request);
1328 fr_assert(!fr_heap_entry_inserted(request->runnable));
1329
1330 /*
1331 * For real requests, if the channel is gone,
1332 * just stop the request and free it.
1333 */
1334 if (request->async->channel && !fr_channel_active(request->async->channel)) {
1335 worker_stop_request(request);
1336 continue;
1337 }
1338
1340
1341 now = fr_time();
1342 }
1343}
1344
1345/** Create a worker
1346 *
1347 * @param[in] ctx the talloc context
1348 * @param[in] name the name of this worker
1349 * @param[in] el the event list
1350 * @param[in] logger the destination for all logging messages
1351 * @param[in] lvl log level
1352 * @param[in] config various configuration parameters
1353 * @return
1354 * - NULL on error
1355 * - fr_worker_t on success
1356 */
1357fr_worker_t *fr_worker_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl,
1359{
1360 fr_worker_t *worker;
1361
1362 worker = talloc_zero(ctx, fr_worker_t);
1363 if (!worker) {
1364nomem:
1365 fr_strerror_const("Failed allocating memory");
1366 return NULL;
1367 }
1368
1369 worker->name = talloc_strdup(worker, name); /* thread locality */
1370
1372
1373 if (config) worker->config = *config;
1374
1375#define CHECK_CONFIG(_x, _min, _max) do { \
1376 if (!worker->config._x) worker->config._x = _min; \
1377 if (worker->config._x < _min) worker->config._x = _min; \
1378 if (worker->config._x > _max) worker->config._x = _max; \
1379 } while (0)
1380
1381#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max) do { \
1382 if (fr_time_delta_lt(worker->config._x, _min)) worker->config._x = _min; \
1383 if (fr_time_delta_gt(worker->config._x, _max)) worker->config._x = _max; \
1384 } while (0)
1385
1386 CHECK_CONFIG(max_requests,1024,(1 << 30));
1387 CHECK_CONFIG(max_channels, 64, 1024);
1388 CHECK_CONFIG(reuse.child_pool_size, 4096, 65536);
1389 CHECK_CONFIG(message_set_size, 1024, 8192);
1390 CHECK_CONFIG(ring_buffer_size, (1 << 17), (1 << 20));
1392
1393 worker->channel = talloc_zero_array(worker, fr_worker_channel_t, worker->config.max_channels);
1394 if (!worker->channel) {
1395 talloc_free(worker);
1396 goto nomem;
1397 }
1398
1399 worker->thread_id = pthread_self();
1400 worker->el = el;
1401 worker->log = logger;
1402 worker->lvl = lvl;
1403
1404 /*
1405 * The worker thread starts now. Manually initialize it,
1406 * because we're tracking request time, not the time that
1407 * the worker thread is running.
1408 */
1409 memset(&worker->tracking, 0, sizeof(worker->tracking));
1410
1411 worker->aq_control = fr_atomic_queue_alloc(worker, 1024);
1412 if (!worker->aq_control) {
1413 fr_strerror_const("Failed creating atomic queue");
1414 fail:
1415 talloc_free(worker);
1416 return NULL;
1417 }
1418
1419 worker->control = fr_control_create(worker, el, worker->aq_control);
1420 if (!worker->control) {
1421 fr_strerror_const_push("Failed creating control plane");
1422 goto fail;
1423 }
1424
1426 fr_strerror_const_push("Failed adding control channel");
1427 goto fail;
1428 }
1429
1431 fr_strerror_const_push("Failed adding callback for listeners");
1432 goto fail;
1433 }
1434
1435 worker->runnable = fr_heap_talloc_alloc(worker, worker_runnable_cmp, request_t, runnable, 0);
1436 if (!worker->runnable) {
1437 fr_strerror_const("Failed creating runnable heap");
1438 goto fail;
1439 }
1440
1441 worker->timeout = fr_timer_list_ordered_alloc(worker, el->tl);
1442 if (!worker->timeout) {
1443 fr_strerror_const("Failed creating timeouts list");
1444 goto fail;
1445 }
1446
1447 worker->dedup = fr_rb_inline_talloc_alloc(worker, request_t, dedup_node, worker_dedup_cmp, NULL);
1448 if (!worker->dedup) {
1449 fr_strerror_const("Failed creating de_dup tree");
1450 goto fail;
1451 }
1452
1454 if (!worker->listeners) {
1455 fr_strerror_const("Failed creating listener tree");
1456 goto fail;
1457 }
1458
1459 worker->intp = unlang_interpret_init(worker, el,
1461 .init_internal = _worker_request_internal_init,
1462
1463 .done_external = _worker_request_done_external,
1464 .done_internal = _worker_request_done_internal,
1465 .done_detached = _worker_request_done_detached,
1466
1467 .detach = _worker_request_detach,
1468 .yield = _worker_request_yield,
1469 .resume = _worker_request_resume,
1470 .mark_runnable = _worker_request_runnable,
1471
1472 .scheduled = _worker_request_scheduled,
1473 .prioritise = _worker_request_prioritise
1474 },
1475 worker);
1476 if (!worker->intp){
1477 fr_strerror_const("Failed initialising interpreter");
1478 goto fail;
1479 }
1480
1481 {
1484
1485 if (!(worker->slab = request_slab_list_alloc(worker, el, &worker->config.reuse, NULL, NULL,
1486 UNCONST(void *, worker), true, false))) {
1487 fr_strerror_const("Failed creating request slab list");
1488 goto fail;
1489 }
1490 }
1491
1493
1494 return worker;
1495}
1496
1497
1498/** The main loop and entry point of the stand-alone worker thread.
1499 *
1500 * Where there is only one thread, the event loop runs fr_worker_pre_event() and fr_worker_post_event()
1501 * instead, And then fr_worker_post_event() takes care of calling worker_run_request() to actually run the
1502 * request.
1503 *
1504 * @param[in] worker the worker data structure to manage
1505 */
1507{
1509
1510 while (true) {
1511 bool wait_for_event;
1512 int num_events;
1513
1515
1516 /*
1517 * There are runnable requests. We still service
1518 * the event loop, but we don't wait for events.
1519 */
1520 wait_for_event = (fr_heap_num_elements(worker->runnable) == 0);
1521 if (wait_for_event) {
1522 if (worker->exiting && (worker_num_requests(worker) == 0)) break;
1523
1524 DEBUG4("Ready to process requests");
1525 }
1526
1527 /*
1528 * Check the event list. If there's an error
1529 * (e.g. exit), we stop looping and clean up.
1530 */
1531 DEBUG4("Gathering events - %s", wait_for_event ? "will wait" : "Will not wait");
1532 num_events = fr_event_corral(worker->el, fr_time(), wait_for_event);
1533 if (num_events < 0) {
1534 if (fr_event_loop_exiting(worker->el)) {
1535 DEBUG4("Event loop exiting");
1536 break;
1537 }
1538
1539 PERROR("Failed retrieving events");
1540 break;
1541 }
1542
1543 DEBUG4("%u event(s) pending", num_events);
1544
1545 /*
1546 * Service outstanding events.
1547 */
1548 if (num_events > 0) {
1549 DEBUG4("Servicing event(s)");
1550 fr_event_service(worker->el);
1551 }
1552
1553 /*
1554 * Run any outstanding requests.
1555 */
1556 worker_run_request(worker, fr_time());
1557 }
1558}
1559
1560/** Pre-event handler
1561 *
1562 * This should be run ONLY in single-threaded mode!
1563 */
1565{
1566 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1567 request_t *request;
1568
1569 request = fr_heap_peek(worker->runnable);
1570 if (!request) return 0;
1571
1572 /*
1573 * There's work to do. Tell the event handler to poll
1574 * for IO / timers, but also immediately return to the
1575 * calling function, which has more work to do.
1576 */
1577 return 1;
1578}
1579
1580
1581/** Post-event handler
1582 *
1583 * This should be run ONLY in single-threaded mode!
1584 */
1586{
1587 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1588
1589 worker_run_request(worker, fr_time()); /* Event loop time can be too old, and trigger asserts */
1590}
1591
1592/** Print debug information about the worker structure
1593 *
1594 * @param[in] worker the worker
1595 * @param[in] fp the file where the debug output is printed.
1596 */
1597void fr_worker_debug(fr_worker_t *worker, FILE *fp)
1598{
1600
1601 fprintf(fp, "\tnum_channels = %d\n", worker->num_channels);
1602 fprintf(fp, "\tstats.in = %" PRIu64 "\n", worker->stats.in);
1603
1604 fprintf(fp, "\tcalculated (predicted) total CPU time = %" PRIu64 "\n",
1605 fr_time_delta_unwrap(worker->predicted) * worker->stats.in);
1606 if (worker->stats.in) {
1607 fprintf(fp, "\tcalculated (counted) per request time = %" PRIu64 "\n",
1609 }
1610
1611 fr_time_tracking_debug(&worker->tracking, fp);
1612
1613}
1614
1615/** Create a channel to the worker
1616 *
1617 * Called by the master (i.e. network) thread when it needs to create
1618 * a new channel to a particuler worker.
1619 *
1620 * @param[in] worker the worker
1621 * @param[in] master the control plane of the master
1622 * @param[in] ctx the context in which the channel will be created
1623 */
1625{
1626 fr_channel_t *ch;
1627 pthread_t id;
1628 bool same;
1629
1631
1632 id = pthread_self();
1633 same = (pthread_equal(id, worker->thread_id) != 0);
1634
1635 ch = fr_channel_create(ctx, master, worker->control, same);
1636 if (!ch) return NULL;
1637
1639
1640 /*
1641 * Tell the worker about the channel
1642 */
1643 if (fr_channel_signal_open(ch) < 0) {
1644 talloc_free(ch);
1645 return NULL;
1646 }
1647
1648 return ch;
1649}
1650
1652{
1653 fr_ring_buffer_t *rb;
1654
1655 /*
1656 * Skip a bunch of work if we're already in the worker thread.
1657 */
1658 if (is_worker_thread(worker)) {
1659 return fr_worker_listen_cancel_self(worker, li);
1660 }
1661
1662 rb = fr_worker_rb_init();
1663 if (!rb) return -1;
1664
1665 return fr_control_message_send(worker->control, rb, FR_CONTROL_ID_LISTEN_DEAD, &li, sizeof(li));
1666}
1667
1668#ifdef WITH_VERIFY_PTR
1669/** Verify the worker data structures.
1670 *
1671 * @param[in] worker the worker
1672 */
1673static void worker_verify(fr_worker_t *worker)
1674{
1675 int i;
1676
1677 (void) talloc_get_type_abort(worker, fr_worker_t);
1678 fr_atomic_queue_verify(worker->aq_control);
1679
1680 fr_assert(worker->control != NULL);
1681 (void) talloc_get_type_abort(worker->control, fr_control_t);
1682
1683 fr_assert(worker->el != NULL);
1684 (void) talloc_get_type_abort(worker->el, fr_event_list_t);
1685
1686 fr_assert(worker->runnable != NULL);
1687 (void) talloc_get_type_abort(worker->runnable, fr_heap_t);
1688
1689 fr_assert(worker->dedup != NULL);
1690 (void) talloc_get_type_abort(worker->dedup, fr_rb_tree_t);
1691
1692 for (i = 0; i < worker->config.max_channels; i++) {
1693 if (!worker->channel[i].ch) continue;
1694
1695 (void) talloc_get_type_abort(worker->channel[i].ch, fr_channel_t);
1696 }
1697}
1698#endif
1699
1700int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
1701{
1702 if (num < 0) return -1;
1703 if (num == 0) return 0;
1704
1705 stats[0] = worker->stats.in;
1706 if (num >= 2) stats[1] = worker->stats.out;
1707 if (num >= 3) stats[2] = worker->stats.dup;
1708 if (num >= 4) stats[3] = worker->stats.dropped;
1709 if (num >= 5) stats[4] = worker->num_naks;
1710 if (num >= 6) stats[5] = worker->num_active;
1711
1712 if (num <= 6) return num;
1713
1714 return 6;
1715}
1716
1717static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
1718{
1719 fr_worker_t const *worker = ctx;
1720 fr_time_delta_t when;
1721
1722 if ((info->argc == 0) || (strcmp(info->argv[0], "count") == 0)) {
1723 fprintf(fp, "count.in\t\t\t%" PRIu64 "\n", worker->stats.in);
1724 fprintf(fp, "count.out\t\t\t%" PRIu64 "\n", worker->stats.out);
1725 fprintf(fp, "count.dup\t\t\t%" PRIu64 "\n", worker->stats.dup);
1726 fprintf(fp, "count.dropped\t\t\t%" PRIu64 "\n", worker->stats.dropped);
1727 fprintf(fp, "count.naks\t\t\t%" PRIu64 "\n", worker->num_naks);
1728 fprintf(fp, "count.active\t\t\t%" PRIu64 "\n", worker->num_active);
1729 fprintf(fp, "count.runnable\t\t\t%u\n", fr_heap_num_elements(worker->runnable));
1730 }
1731
1732 if ((info->argc == 0) || (strcmp(info->argv[0], "cpu") == 0)) {
1733 when = worker->predicted;
1734 fprintf(fp, "cpu.request_time_rtt\t\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1735
1736 when = worker->tracking.running_total;
1737 if (fr_time_delta_ispos(when) && (worker->stats.in > worker->stats.dropped)) {
1738 when = fr_time_delta_div(when, fr_time_delta_wrap(worker->stats.in - worker->stats.dropped));
1739 }
1740 fprintf(fp, "cpu.average_request_time\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1741
1742 when = worker->tracking.running_total;
1743 fprintf(fp, "cpu.used\t\t\t%.6f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1744
1745 when = worker->tracking.waiting_total;
1746 fprintf(fp, "cpu.waiting\t\t\t%.3f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1747
1748 fr_time_elapsed_fprint(fp, &worker->cpu_time, "cpu.requests", 4);
1749 fr_time_elapsed_fprint(fp, &worker->wall_clock, "time.requests", 4);
1750 }
1751
1752 return 0;
1753}
1754
1756 {
1757 .parent = "stats",
1758 .name = "worker",
1759 .help = "Statistics for workers threads.",
1760 .read_only = true
1761 },
1762
1763 {
1764 .parent = "stats worker",
1765 .add_name = true,
1766 .name = "self",
1767 .syntax = "[(count|cpu)]",
1768 .func = cmd_stats_worker,
1769 .help = "Show statistics for a specific worker thread.",
1770 .read_only = true
1771 },
1772
1774};
static int const char char buffer[256]
Definition acutest.h:578
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
Definition app_io.h:55
size_t default_reply_size
same for replies
Definition app_io.h:40
size_t default_message_size
Usually maximum message size.
Definition app_io.h:39
fr_io_nak_t nak
Function to send a NAK.
Definition app_io.h:62
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
Definition app_io.h:54
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
Definition application.h:80
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
Definition application.h:85
#define fr_atexit_thread_local(_name, _free, _uctx)
Definition atexit.h:221
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Structure to hold the atomic queue.
#define UNCONST(_type, _ptr)
Remove const qualification from a pointer.
Definition build.h:167
#define RCSID(id)
Definition build.h:487
#define NDEBUG_UNUSED
Definition build.h:328
#define CMP_PREFER_LARGER(_a, _b)
Evaluates to -1 for a > b, and +1 for a < b.
Definition build.h:108
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:112
#define unlikely(_x)
Definition build.h:383
#define UNUSED
Definition build.h:317
unlang_action_t unlang_call_push(unlang_result_t *p_result, request_t *request, CONF_SECTION *server_cs, bool top_frame)
Push a call frame onto the stack.
Definition call.c:147
fr_table_num_sorted_t const channel_signals[]
Definition channel.c:153
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
Definition channel.c:183
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition channel.c:685
int fr_channel_set_recv_request(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_request)
Definition channel.c:938
void * fr_channel_responder_uctx_get(fr_channel_t *ch)
Get responder-specific data from a channel.
Definition channel.c:897
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
Definition channel.c:472
int fr_channel_null_reply(fr_channel_t *ch)
Don't send a reply message into the channel.
Definition channel.c:624
void fr_channel_responder_uctx_add(fr_channel_t *ch, void *uctx)
Add responder-specific data to a channel.
Definition channel.c:885
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
Definition channel.c:511
bool fr_channel_active(fr_channel_t *ch)
Check if a channel is active.
Definition channel.c:812
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
Definition channel.c:854
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
Definition channel.c:952
A full channel, which consists of two ends.
Definition channel.c:144
fr_message_t m
the message header
Definition channel.h:105
fr_channel_event_t
Definition channel.h:67
@ FR_CHANNEL_NOOP
Definition channel.h:74
@ FR_CHANNEL_EMPTY
Definition channel.h:75
@ FR_CHANNEL_CLOSE
Definition channel.h:72
@ FR_CHANNEL_ERROR
Definition channel.h:68
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition channel.h:70
@ FR_CHANNEL_OPEN
Definition channel.h:71
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition channel.h:69
void * packet_ctx
Packet specific context for holding client information, and other proto_* specific information that n...
Definition channel.h:142
fr_listen_t * listen
for tracking packet transport, etc.
Definition channel.h:146
uint32_t priority
Priority of this packet.
Definition channel.h:140
Channel information which is added to a message.
Definition channel.h:104
int argc
current argument count
Definition command.h:39
char const * parent
e.g. "show module"
Definition command.h:52
#define CMD_TABLE_END
Definition command.h:62
char const ** argv
text version of commands
Definition command.h:42
#define FR_CONTROL_ID_LISTEN_DEAD
Definition control.h:61
#define FR_CONTROL_ID_CHANNEL
Definition control.h:56
#define FR_CONTROL_MAX_SIZE
Definition control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition control.h:50
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:131
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:202
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:148
#define MEM(x)
Definition debug.h:36
#define ERROR(fmt,...)
Definition dhcpclient.c:41
#define DEBUG(fmt,...)
Definition dhcpclient.c:39
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:242
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition dlist.h:468
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition dlist.h:145
static void fr_dlist_entry_unlink(fr_dlist_t *entry)
Remove an item from the dlist when we don't have access to the head.
Definition dlist.h:128
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:921
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition dlist.h:654
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:360
static void fr_dlist_entry_init(fr_dlist_t *entry)
Initialise a linked list without metadata.
Definition dlist.h:120
Head of a doubly linked list.
Definition dlist.h:51
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
Definition heap.c:325
int fr_heap_extract(fr_heap_t **hp, void *data)
Remove a node from the heap.
Definition heap.c:239
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition heap.h:136
static bool fr_heap_entry_inserted(fr_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
Definition heap.h:124
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition heap.h:115
The main heap structure.
Definition heap.h:66
talloc_free(hp)
rlm_rcode_t unlang_interpret(request_t *request, bool running)
Run the interpreter for a current request.
Definition interpret.c:939
void unlang_interpret_set(request_t *request, unlang_interpret_t *intp)
Set a specific interpreter for a request.
Definition interpret.c:2022
int unlang_interpret_stack_depth(request_t *request)
Return the depth of the request's stack.
Definition interpret.c:1541
void unlang_interpret_set_thread_default(unlang_interpret_t *intp)
Set the default interpreter for this thread.
Definition interpret.c:2053
unlang_interpret_t * unlang_interpret_init(TALLOC_CTX *ctx, fr_event_list_t *el, unlang_request_func_t *funcs, void *uctx)
Initialize a unlang compiler / interpret.
Definition interpret.c:1981
bool unlang_request_is_cancelled(request_t const *request)
Return whether a request has been cancelled.
Definition interpret.c:1591
void unlang_interpret_signal(request_t *request, fr_signal_t action)
Send a signal (usually stop) to a request.
Definition interpret.c:1405
bool unlang_interpret_is_resumable(request_t *request)
Check if a request as resumable.
Definition interpret.c:1610
#define UNLANG_REQUEST_RESUME
Definition interpret.h:43
#define UNLANG_TOP_FRAME
Definition interpret.h:36
External functions provided by the owner of the interpret.
Definition interpret.h:110
uint64_t out
Definition base.h:43
uint64_t dup
Definition base.h:44
uint64_t dropped
Definition base.h:45
uint64_t in
Definition base.h:42
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition control.c:428
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:340
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition control.c:149
The control structure.
Definition control.c:79
void const * app_instance
Definition listen.h:39
fr_app_t const * app
Definition listen.h:38
void const * app_io_instance
I/O path configuration context.
Definition listen.h:33
CONF_SECTION * server_cs
CONF_SECTION of the server.
Definition listen.h:42
fr_dict_t const * dict
dictionary for this listener
Definition listen.h:30
fr_app_io_t const * app_io
I/O path functions.
Definition listen.h:32
Minimal data structure to use the new code.
Definition listen.h:63
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define RDEBUG3(fmt,...)
Definition log.h:355
#define RWARN(fmt,...)
Definition log.h:309
#define RERROR(fmt,...)
Definition log.h:310
#define DEBUG4(_fmt,...)
Definition log.h:267
#define RPERROR(fmt,...)
Definition log.h:314
#define RPEDEBUG(fmt,...)
Definition log.h:388
#define RDEBUG4(fmt,...)
Definition log.h:356
#define RATE_LIMIT_GLOBAL(_log, _fmt,...)
Rate limit messages using a global limiting entry.
Definition log.h:653
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition event.c:2193
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition event.c:2058
int fr_event_post_delete(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Delete a post-event callback from the event list.
Definition event.c:2029
bool fr_event_loop_exiting(fr_event_list_t *el)
Check to see whether the event loop is in the process of exiting.
Definition event.c:2382
Stores all information relating to an event list.
Definition event.c:377
fr_log_lvl_t
Definition log.h:67
fr_packet_t * fr_packet_alloc(TALLOC_CTX *ctx, bool new_vector)
Allocate a new fr_packet_t.
Definition packet.c:38
unsigned int uint32_t
long int ssize_t
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
Create a message set.
Definition message.c:128
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:196
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:1001
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition message.c:1251
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition message.c:947
A Message set, composed of message headers and ring buffer data.
Definition message.c:95
size_t rb_size
cache-aligned size in the ring buffer
Definition message.h:51
fr_time_t when
when this message was sent
Definition message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
static const conf_parser_t config[]
Definition base.c:169
#define fr_assert(_expr)
Definition rad_assert.h:38
#define REDEBUG(fmt,...)
#define RDEBUG(fmt,...)
#define DEBUG2(fmt,...)
static void send_reply(int sockfd, fr_channel_data_t *reply)
void * fr_rb_remove(fr_rb_tree_t *tree, void const *data)
Remove an entry from the tree, without freeing the data.
Definition rb.c:695
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
bool fr_rb_delete(fr_rb_tree_t *tree, void const *data)
Remove node and free data (if a free function was specified)
Definition rb.c:741
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
Definition rb.h:246
The main red black tree structure.
Definition rb.h:73
rlm_rcode_t
Return codes indicating the result of the module call.
Definition rcode.h:44
@ RLM_MODULE_TIMEOUT
Module (or section) timed out.
Definition rcode.h:56
int request_slab_deinit(request_t *request)
Callback for slabs to deinitialise the request.
Definition request.c:383
int request_detach(request_t *child)
Unlink a subrequest from its parent.
Definition request.c:542
#define REQUEST_VERIFY(_x)
Definition request.h:309
#define request_is_detached(_x)
Definition request.h:186
#define request_is_external(_x)
Definition request.h:184
#define REQUEST_POOL_HEADERS
Definition request.h:66
#define request_is_internal(_x)
Definition request.h:185
@ REQUEST_TYPE_EXTERNAL
A request received on the wire.
Definition request.h:178
#define request_is_detachable(_x)
Definition request.h:187
#define request_init(_ctx, _type, _args)
Definition request.h:321
#define REQUEST_POOL_SIZE
Definition request.h:79
Optional arguments for initialising requests.
Definition request.h:287
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:64
static char const * name
@ FR_SIGNAL_DUP
A duplicate request was received.
Definition signal.h:44
@ FR_SIGNAL_CANCEL
Request has been cancelled.
Definition signal.h:40
#define FR_SLAB_FUNCS(_name, _type)
Define type specific wrapper functions for slabs and slab elements.
Definition slab.h:120
#define FR_SLAB_TYPES(_name, _type)
Define type specific wrapper structs for slabs and slab elements.
Definition slab.h:72
unsigned int num_children
How many child allocations are expected off each element.
Definition slab.h:48
size_t child_pool_size
Size of pool space to be allocated to each element.
Definition slab.h:49
return count
Definition module.c:155
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
@ memory_order_seq_cst
Definition stdatomic.h:132
#define atomic_fetch_add_explicit(object, operand, order)
Definition stdatomic.h:302
#define _Atomic(T)
Definition stdatomic.h:77
Definition log.h:96
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition talloc.h:230
void fr_time_elapsed_update(fr_time_elapsed_t *elapsed, fr_time_t start, fr_time_t end)
Definition time.c:547
void fr_time_elapsed_fprint(FILE *fp, fr_time_elapsed_t const *elapsed, char const *prefix, int tab_offset)
Definition time.c:592
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition time.h:575
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
Definition time.h:154
#define fr_time_delta_lt(_a, _b)
Definition time.h:285
static fr_time_delta_t fr_time_delta_from_sec(int64_t sec)
Definition time.h:590
#define fr_time_delta_wrap(_time)
Definition time.h:152
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_eq(_a, _b)
Definition time.h:241
#define NSEC
Definition time.h:379
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
static fr_time_delta_t fr_time_delta_div(fr_time_delta_t a, fr_time_delta_t b)
Definition time.h:267
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition time.h:916
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
@ FR_TIME_TRACKING_YIELDED
We're currently tracking time in the yielded state.
static void fr_time_tracking_yield(fr_time_tracking_t *tt, fr_time_t now)
Transition to the yielded state, recording the time we just spent running.
static void fr_time_tracking_end(fr_time_delta_t *predicted, fr_time_tracking_t *tt, fr_time_t now)
End time tracking for this entity.
fr_time_delta_t waiting_total
total time spent waiting
fr_time_delta_t running_total
total time spent running
static void fr_time_tracking_start(fr_time_tracking_t *parent, fr_time_tracking_t *tt, fr_time_t now)
Start time tracking for a tracked entity.
static void fr_time_tracking_resume(fr_time_tracking_t *tt, fr_time_t now)
Track that a request resumed.
static void fr_time_tracking_debug(fr_time_tracking_t *tt, FILE *fp)
Print debug information about the time tracking structure.
uint64_t fr_timer_list_num_events(fr_timer_list_t *tl)
Return number of pending events.
Definition timer.c:1149
fr_timer_list_t * fr_timer_list_ordered_alloc(TALLOC_CTX *ctx, fr_timer_list_t *parent)
Allocate a new sorted event timer list.
Definition timer.c:1291
int fr_timer_list_force_run(fr_timer_list_t *tl)
Forcibly run all events in an event loop.
Definition timer.c:921
An event timer list.
Definition timer.c:50
#define fr_timer_in(...)
Definition timer.h:87
static bool fr_timer_armed(fr_timer_t *ev)
Definition timer.h:120
static fr_event_list_t * el
int unlang_thread_instantiate(TALLOC_CTX *ctx)
Create thread-specific data structures for unlang.
Definition compile.c:2293
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
Definition strerror.c:732
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1334
fr_heap_t * runnable
current runnable requests which we've spent time processing
Definition worker.c:118
static void worker_request_time_tracking_end(fr_worker_t *worker, request_t *request, fr_time_t now)
Definition worker.c:578
static void _worker_request_yield(request_t *request, UNUSED void *uctx)
Interpreter yielded request.
Definition worker.c:1259
static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
Handle a control plane message sent to the worker via a channel.
Definition worker.c:252
fr_event_list_t * el
our event list
Definition worker.c:114
int fr_worker_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
Pre-event handler.
Definition worker.c:1564
static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now)
Send a response packet to the network side.
Definition worker.c:595
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
Definition worker.c:1624
fr_rb_tree_t * listeners
so we can cancel requests when a listener goes away
Definition worker.c:125
static void worker_run_request(fr_worker_t *worker, fr_time_t start)
Run a request.
Definition worker.c:1308
static void worker_exit(fr_worker_t *worker)
Definition worker.c:230
#define WORKER_VERIFY
Definition worker.c:75
bool was_sleeping
used to suppress multiple sleep signals in a row
Definition worker.c:137
static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
Definition worker.c:1717
static void _worker_request_runnable(request_t *request, void *uctx)
Request is now runnable.
Definition worker.c:1248
static char * itoa_internal(TALLOC_CTX *ctx, uint64_t number)
Definition worker.c:720
fr_worker_t * fr_worker_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config)
Create a worker.
Definition worker.c:1357
static int8_t worker_dedup_cmp(void const *one, void const *two)
Track a request_t in the "dedup" tree.
Definition worker.c:994
fr_worker_channel_t * channel
list of channels
Definition worker.c:140
char const * name
name of this worker
Definition worker.c:100
uint64_t num_active
number of active requests
Definition worker.c:132
fr_cmd_table_t cmd_worker_table[]
Definition worker.c:1755
static int worker_request_time_tracking_start(fr_worker_t *worker, request_t *request, fr_time_t now)
Start time tracking for a request, and mark it as runnable.
Definition worker.c:548
int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
Definition worker.c:1700
static int _worker_request_deinit(request_t *request, UNUSED void *uctx)
Definition worker.c:773
static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
Detached request (i.e.
Definition worker.c:1183
static void _worker_request_resume(request_t *request, UNUSED void *uctx)
Interpreter is starting to work on request again.
Definition worker.c:1268
fr_rb_tree_t * dedup
de-dup tree
Definition worker.c:123
fr_atomic_queue_t * aq_control
atomic queue for control messages sent to me
Definition worker.c:110
static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Send a NAK to the network thread.
Definition worker.c:418
static void worker_request_name_number(request_t *request)
Definition worker.c:760
static void _worker_request_timeout(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t when, void *uctx)
Enforce max_request_time.
Definition worker.c:528
static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Definition worker.c:778
fr_log_t const * log
log destination
Definition worker.c:107
fr_io_stats_t stats
input / output stats
Definition worker.c:127
#define CHECK_CONFIG(_x, _min, _max)
static void _worker_request_detach(request_t *request, void *uctx)
Make us responsible for running the request.
Definition worker.c:1216
static int _fr_worker_rb_free(void *arg)
Definition worker.c:170
fr_time_tracking_t tracking
how much time the worker has spent doing things.
Definition worker.c:135
static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
External request is now complete.
Definition worker.c:1097
void fr_worker_destroy(fr_worker_t *worker)
Destroy a worker.
Definition worker.c:1017
uint64_t num_naks
number of messages which were nak'd
Definition worker.c:131
static void worker_request_init(fr_worker_t *worker, request_t *request, fr_time_t now)
Initialize various request fields needed by the worker.
Definition worker.c:743
fr_worker_config_t config
external configuration
Definition worker.c:101
fr_listen_t const * listener
incoming packets
Definition worker.c:146
unlang_interpret_t * intp
Worker's local interpreter.
Definition worker.c:103
static int fr_worker_listen_cancel_self(fr_worker_t *worker, fr_listen_t const *li)
Definition worker.c:369
static void worker_stop_request(request_t *request)
Signal the unlang interpreter that it needs to stop running the request.
Definition worker.c:509
static void _worker_request_prioritise(request_t *request, void *uctx)
Update a request's priority.
Definition worker.c:1285
bool exiting
are we exiting?
Definition worker.c:138
fr_log_lvl_t lvl
log level
Definition worker.c:108
static void worker_requests_cancel(fr_worker_channel_t *ch)
Definition worker.c:221
int num_channels
actual number of channels
Definition worker.c:116
fr_time_delta_t max_request_time
maximum time a request can be processed
Definition worker.c:121
fr_time_elapsed_t cpu_time
histogram of total CPU time per request
Definition worker.c:128
fr_rb_node_t node
in tree of listeners
Definition worker.c:148
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
Definition worker.c:1651
void fr_worker_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Post-event handler.
Definition worker.c:1585
fr_dlist_head_t dlist
of requests associated with this listener.
Definition worker.c:154
void fr_worker(fr_worker_t *worker)
The main loop and entry point of the stand-alone worker thread.
Definition worker.c:1506
request_slab_list_t * slab
slab allocator for request_t
Definition worker.c:142
static uint32_t worker_num_requests(fr_worker_t *worker)
Definition worker.c:768
fr_time_delta_t predicted
How long we predict a request will take to execute.
Definition worker.c:134
pthread_t thread_id
my thread ID
Definition worker.c:105
static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Callback which handles a message being received on the worker side.
Definition worker.c:211
fr_time_elapsed_t wall_clock
histogram of wall clock time per request
Definition worker.c:129
static int8_t worker_listener_cmp(void const *one, void const *two)
Definition worker.c:158
static bool is_worker_thread(fr_worker_t const *worker)
Definition worker.c:197
fr_worker_channel_t
Definition worker.c:94
fr_timer_list_t * timeout
Track when requests timeout using a dlist.
Definition worker.c:120
static fr_ring_buffer_t * fr_worker_rb_init(void)
Initialise thread local storage.
Definition worker.c:179
fr_control_t * control
the control plane
Definition worker.c:112
static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
Check if a request is scheduled.
Definition worker.c:1277
static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Internal request (i.e.
Definition worker.c:1167
void fr_worker_debug(fr_worker_t *worker, FILE *fp)
Print debug information about the worker structure.
Definition worker.c:1597
static void _worker_request_internal_init(request_t *request, void *uctx)
Internal request (i.e.
Definition worker.c:1076
#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max)
static int8_t worker_runnable_cmp(void const *one, void const *two)
Track a request_t in the "runnable" heap.
Definition worker.c:968
static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
A socket is going away, so clean up any requests which use this socket.
Definition worker.c:396
A worker which takes packets from a master, and processes them.
Definition worker.c:99
int message_set_size
default start number of messages
Definition worker.h:64
int max_requests
max requests this worker will handle
Definition worker.h:60
int max_channels
maximum number of channels
Definition worker.h:62
fr_slab_config_t reuse
slab allocator configuration
Definition worker.h:69
int ring_buffer_size
default start size for the ring buffers
Definition worker.h:65
fr_time_delta_t max_request_time
maximum time a request can be processed
Definition worker.h:67