The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
worker.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17 /**
18 * $Id: e2b2aa4df197056d1663a6687c4c61688da97810 $
19 *
20 * @brief Worker thread functions.
21 * @file io/worker.c
22 *
23 * The "worker" thread is the one responsible for the bulk of the
24 * work done when processing a request. Workers are spawned by the
25 * scheduler, and create a kqueue (KQ) and control-plane
26 * Atomic Queue (AQ) for control-plane communication.
27 *
28 * When a network thread discovers that it needs more workers, it
29 * asks the scheduler for a KQ/AQ combination. The network thread
30 * then creates a channel dedicated to that worker, and sends the
31 * channel to the worker in a "new channel" message. The worker
32 * receives the channel, and sends an ACK back to the network thread.
33 *
34 * The network thread then sends the worker new packets, which the
35 * worker receives and processes.
36 *
37 * When a packet is decoded, it is put into the "runnable" heap, and
38 * also into the "time_order" heap. The main loop fr_worker() then
39 * pulls new requests off of this heap and runs them. The
40 * worker_check_timeouts() function also checks the tail of the
41 * "time_order" heap, and ages out requests which have been active
42 * for "too long".
43 *
44 * If a request is yielded, it is placed onto the yielded list in
45 * the worker "tracking" data structure.
46 *
47 * @copyright 2016 Alan DeKok (aland@freeradius.org)
48 */
49
50RCSID("$Id: e2b2aa4df197056d1663a6687c4c61688da97810 $")
51
52#define LOG_PREFIX worker->name
53#define LOG_DST worker->log
54
55#include <freeradius-devel/io/channel.h>
56#include <freeradius-devel/io/listen.h>
57#include <freeradius-devel/io/message.h>
58#include <freeradius-devel/io/worker.h>
59#include <freeradius-devel/unlang/base.h>
60#include <freeradius-devel/unlang/call.h>
61#include <freeradius-devel/unlang/interpret.h>
62#include <freeradius-devel/server/request.h>
63#include <freeradius-devel/server/time_tracking.h>
64#include <freeradius-devel/util/dlist.h>
65#include <freeradius-devel/util/minmax_heap.h>
66
67#include <stdalign.h>
68
69#ifdef WITH_VERIFY_PTR
70static void worker_verify(fr_worker_t *worker);
71#define WORKER_VERIFY worker_verify(worker)
72#else
73#define WORKER_VERIFY
74#endif
75
76#define CACHE_LINE_SIZE 64
78
79static _Thread_local fr_ring_buffer_t *fr_worker_rb;
80
81typedef struct {
83
84 /*
85 * To save time, we don't care about num_elements here. Which means that we don't
86 * need to cache or lookup the fr_worker_listen_t when we free a request.
87 */
90
91/**
92 * A worker which takes packets from a master, and processes them.
93 */
95 char const *name; //!< name of this worker
96 fr_worker_config_t config; //!< external configuration
97
98 unlang_interpret_t *intp; //!< Worker's local interpreter.
99
100 pthread_t thread_id; //!< my thread ID
101
102 fr_log_t const *log; //!< log destination
103 fr_log_lvl_t lvl; //!< log level
104
105 fr_atomic_queue_t *aq_control; //!< atomic queue for control messages sent to me
106
107 fr_control_t *control; //!< the control plane
108
109 fr_event_list_t *el; //!< our event list
110
111 int num_channels; //!< actual number of channels
112
113 fr_heap_t *runnable; //!< current runnable requests which we've spent time processing
114 fr_minmax_heap_t *time_order; //!< time ordered heap of requests
115 fr_rb_tree_t *dedup; //!< de-dup tree
116
117 fr_rb_tree_t *listeners; //!< so we can cancel requests when a listener goes away
118
119 fr_io_stats_t stats; //!< input / output stats
120 fr_time_elapsed_t cpu_time; //!< histogram of total CPU time per request
121 fr_time_elapsed_t wall_clock; //!< histogram of wall clock time per request
122
123 uint64_t num_naks; //!< number of messages which were nak'd
124 uint64_t num_active; //!< number of active requests
125
126 fr_time_delta_t predicted; //!< How long we predict a request will take to execute.
127 fr_time_tracking_t tracking; //!< how much time the worker has spent doing things.
128
129 bool was_sleeping; //!< used to suppress multiple sleep signals in a row
130 bool exiting; //!< are we exiting?
131
132 fr_time_t checked_timeout; //!< when we last checked the tails of the queues
133
134 fr_event_timer_t const *ev_cleanup; //!< timer for max_request_time
135
136 fr_worker_channel_t *channel; //!< list of channels
137};
138
139typedef struct {
140 fr_listen_t const *listener; //!< incoming packets
141
142 fr_rb_node_t node; //!< in tree of listeners
143
144 /*
145 * To save time, we don't care about num_elements here. Which means that we don't
146 * need to cache or lookup the fr_worker_listen_t when we free a request.
147 */
148 fr_dlist_head_t dlist; //!< of requests associated with this listener.
150
151
152static int8_t worker_listener_cmp(void const *one, void const *two)
153{
154 fr_worker_listen_t const *a = one, *b = two;
155
156 return CMP(a->listener, b->listener);
157}
158
159
160/*
161 * Explicitly cleanup the memory allocated to the ring buffer,
162 * just in case valgrind complains about it.
163 */
164static int _fr_worker_rb_free(void *arg)
165{
166 return talloc_free(arg);
167}
168
169/** Initialise thread local storage
170 *
171 * @return fr_ring_buffer_t for messages
172 */
174{
176
178 if (rb) return rb;
179
181 if (!rb) {
182 fr_perror("Failed allocating memory for worker ring buffer");
183 return NULL;
184 }
185
187
188 return rb;
189}
190
191static inline bool is_worker_thread(fr_worker_t const *worker)
192{
193 return (pthread_equal(pthread_self(), worker->thread_id) != 0);
194}
195
197static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now);
198static void worker_max_request_time(UNUSED fr_event_list_t *el, UNUSED fr_time_t when, void *uctx);
199static void worker_max_request_timer(fr_worker_t *worker);
200
201/** Callback which handles a message being received on the worker side.
202 *
203 * @param[in] ctx the worker
204 * @param[in] ch the channel to drain
205 * @param[in] cd the message (if any) to start with
206 */
207static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
208{
209 fr_worker_t *worker = ctx;
210
211 worker->stats.in++;
212 DEBUG3("Received request %" PRIu64 "", worker->stats.in);
213 cd->channel.ch = ch;
214 worker_request_bootstrap(worker, cd, fr_time());
215}
216
218{
219 request_t *request;
220
221 while ((request = fr_dlist_pop_head(&ch->dlist)) != NULL) {
223 }
224}
225
226static void worker_exit(fr_worker_t *worker)
227{
228 worker->exiting = true;
229
230 /*
231 * Don't allow the post event to run
232 * any more requests. They'll be
233 * signalled to stop before we exit.
234 *
235 * This only has an effect in single
236 * threaded mode.
237 */
238 (void)fr_event_post_delete(worker->el, fr_worker_post_event, worker);
239}
240
241/** Handle a control plane message sent to the worker via a channel
242 *
243 * @param[in] ctx the worker
244 * @param[in] data the message
245 * @param[in] data_size size of the data
246 * @param[in] now the current time
247 */
248static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
249{
250 int i;
251 bool ok, was_sleeping;
252 fr_channel_t *ch;
255 fr_worker_t *worker = ctx;
256
257 was_sleeping = worker->was_sleeping;
258 worker->was_sleeping = false;
259
260 /*
261 * We were woken up by a signal to do something. We're
262 * not sleeping.
263 */
264 ce = fr_channel_service_message(now, &ch, data, data_size);
265 DEBUG3("Channel %s",
266 fr_table_str_by_value(channel_signals, ce, "<INVALID>"));
267 switch (ce) {
268 case FR_CHANNEL_ERROR:
269 return;
270
271 case FR_CHANNEL_EMPTY:
272 return;
273
274 case FR_CHANNEL_NOOP:
275 return;
276
278 fr_assert(0 == 1);
279 break;
280
282 fr_assert(ch != NULL);
283
284 if (!fr_channel_recv_request(ch)) {
285 worker->was_sleeping = was_sleeping;
286
287 } else while (fr_channel_recv_request(ch));
288 break;
289
290 case FR_CHANNEL_OPEN:
291 fr_assert(ch != NULL);
292
293 ok = false;
294 for (i = 0; i < worker->config.max_channels; i++) {
295 fr_assert(worker->channel[i].ch != ch);
296
297 if (worker->channel[i].ch != NULL) continue;
298
299 worker->channel[i].ch = ch;
300 fr_dlist_init(&worker->channel[i].dlist, fr_async_t, entry);
301
302 DEBUG3("Received channel %p into array entry %d", ch, i);
303
304 ms = fr_message_set_create(worker, worker->config.message_set_size,
305 sizeof(fr_channel_data_t),
306 worker->config.ring_buffer_size);
307 fr_assert(ms != NULL);
309
310 worker->num_channels++;
311 ok = true;
312 break;
313 }
314
315 fr_cond_assert(ok);
316 break;
317
318 case FR_CHANNEL_CLOSE:
319 fr_assert(ch != NULL);
320
321 ok = false;
322
323 /*
324 * Locate the signalling channel in the list
325 * of channels.
326 */
327 for (i = 0; i < worker->config.max_channels; i++) {
328 if (!worker->channel[i].ch) continue;
329
330 if (worker->channel[i].ch != ch) continue;
331
332 worker_requests_cancel(&worker->channel[i]);
333
335
337 "Network added messages to channel after sending FR_CHANNEL_CLOSE");
338
340 fr_assert(ms != NULL);
342 talloc_free(ms);
343
344 worker->channel[i].ch = NULL;
345
346 fr_assert(!fr_dlist_head(&worker->channel[i].dlist)); /* we can't look at num_elements */
347 fr_assert(worker->num_channels > 0);
348
349 worker->num_channels--;
350 ok = true;
351 break;
352 }
353
354 fr_cond_assert(ok);
355
356 /*
357 * Our last input channel closed,
358 * time to die.
359 */
360 if (worker->num_channels == 0) worker_exit(worker);
361 break;
362 }
363}
364
366{
368 request_t *request;
369
370 wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = li });
371 if (!wl) return -1;
372
373 while ((request = fr_dlist_pop_head(&wl->dlist)) != NULL) {
374 RDEBUG("Canceling request due to socket being closed");
376 }
377
378 (void) fr_rb_delete(worker->listeners, wl);
379 talloc_free(wl);
380
381 return 0;
382}
383
384
385/** A socket is going away, so clean up any requests which use this socket.
386 *
387 * @param[in] ctx the worker
388 * @param[in] data the message
389 * @param[in] data_size size of the data
390 * @param[in] now the current time
391 */
392static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
393{
394 fr_listen_t const *li;
395 fr_worker_t *worker = ctx;
396
397 fr_assert(data_size == sizeof(li));
398
399 memcpy(&li, data, sizeof(li));
400
401 (void) fr_worker_listen_cancel_self(worker, li);
402}
403
404/** Send a NAK to the network thread
405 *
406 * The network thread believes that a worker is running a request until that request has been NAK'd.
407 * We typically NAK requests when they've been hanging around in the worker's backlog too long,
408 * or there was an error executing the request.
409 *
410 * @param[in] worker the worker
411 * @param[in] cd the message to NAK
412 * @param[in] now when the message is NAKd
413 */
414static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
415{
416 size_t size;
417 fr_channel_data_t *reply;
418 fr_channel_t *ch;
420 fr_listen_t *listen;
421
422 worker->num_naks++;
423
424 /*
425 * Cache the outbound channel. We'll need it later.
426 */
427 ch = cd->channel.ch;
428 listen = cd->listen;
429
430 /*
431 * If the channel has been closed, but we haven't
432 * been informed, that is extremely bad.
433 *
434 * Try to continue working... but we'll likely
435 * leak memory or SEGV soon.
436 */
437 if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send NAK but channel has been closed")) {
438 fr_message_done(&cd->m);
439 return;
440 }
441
443 fr_assert(ms != NULL);
444
445 size = listen->app_io->default_reply_size;
446 if (!size) size = listen->app_io->default_message_size;
447
448 /*
449 * Allocate a default message size.
450 */
451 reply = (fr_channel_data_t *) fr_message_reserve(ms, size);
452 fr_assert(reply != NULL);
453
454 /*
455 * Encode a NAK
456 */
457 if (listen->app_io->nak) {
458 size = listen->app_io->nak(listen, cd->packet_ctx, cd->m.data,
459 cd->m.data_size, reply->m.data, reply->m.rb_size);
460 } else {
461 size = 1; /* rely on them to figure it the heck out */
462 }
463
464 (void) fr_message_alloc(ms, &reply->m, size);
465
466 /*
467 * Fill in the NAK.
468 */
469 reply->m.when = now;
470 reply->reply.cpu_time = worker->tracking.running_total;
471 reply->reply.processing_time = fr_time_delta_from_sec(10); /* @todo - set to something better? */
472 reply->reply.request_time = cd->request.recv_time;
473
474 reply->listen = cd->listen;
475 reply->packet_ctx = cd->packet_ctx;
476
477 /*
478 * Mark the original message as done.
479 */
480 fr_message_done(&cd->m);
481
482 /*
483 * Send the reply, which also polls the request queue.
484 */
485 if (fr_channel_send_reply(ch, reply) < 0) {
486 DEBUG2("Failed sending reply to channel");
487 }
488
489 worker->stats.out++;
490}
491
492/** Signal the unlang interpreter that it needs to stop running the request
493 *
494 * Signalling is a synchronous operation. Whatever I/O requests the request
495 * is currently performing are immediately cancelled, and all the frames are
496 * popped off the unlang stack.
497 *
498 * Modules and unlang keywords explicitly register signal handlers to deal
499 * with their yield points being cancelled/interrupted via this function.
500 *
501 * The caller should assume the request is no longer viable after calling
502 * this function.
503 *
504 * @param[in] request_p Pointer to the request to cancel.
505 * Will be set to NULL.
506 */
507static void worker_stop_request(request_t **request_p)
508{
509 /*
510 * Also marks the request as done and runs
511 * the internal/external callbacs.
512 */
514 *request_p = NULL;
515}
516
517/** Enforce max_request_time
518 *
519 * Run periodically, and tries to clean up requests which were received by the network
520 * thread more than max_request_time seconds ago. In the interest of not adding a
521 * timer for every packet, the requests are given a 1 second leeway.
522 *
523 * @param[in] el the worker's event list
524 * @param[in] when the current time
525 * @param[in] uctx the fr_worker_t.
526 */
528{
529 fr_time_t now = fr_time();
530 request_t *request;
531 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
532
533 /*
534 * Look at the oldest requests, and see if they need to
535 * be deleted.
536 */
537 while ((request = fr_minmax_heap_min_peek(worker->time_order)) != NULL) {
539
540 REQUEST_VERIFY(request);
541
542 cleanup = fr_time_add(request->async->recv_time, worker->config.max_request_time);
543 if (fr_time_gt(cleanup, now)) break;
544
545 /*
546 * Waiting too long, delete it.
547 */
548 REDEBUG("Request has reached max_request_time - signalling it to stop");
549 (void) fr_minmax_heap_extract(worker->time_order, request);
550 worker_stop_request(&request);
551 }
552
553 /*
554 * Reset the max request timer.
555 */
557}
558
559/** See when we next need to service the time_order heap for "too old" packets
560 *
561 * Inserts a timer into the event list will will trigger when the packet that
562 * was received longest ago, would be older than max_request_time.
563 */
565{
567 request_t *request;
568
569 /*
570 * No more requests, delete the timer.
571 */
572 request = fr_minmax_heap_max_peek(worker->time_order);
573 if (!request) return;
574
575 cleanup = fr_time_add(request->async->recv_time, worker->config.max_request_time);
576
577 DEBUG2("Resetting cleanup timer to +%pV", fr_box_time_delta(worker->config.max_request_time));
578 if (fr_event_timer_at(worker, worker->el, &worker->ev_cleanup,
579 cleanup, worker_max_request_time, worker) < 0) {
580 ERROR("Failed inserting max_request_time timer");
581 }
582}
583
584/** Start time tracking for a request, and mark it as runnable.
585 *
586 */
588{
589 /*
590 * New requests are inserted into the time order heap in
591 * strict time priority. Once they are in the list, they
592 * are only removed when the request is done / free'd.
593 */
594 fr_assert(!fr_minmax_heap_entry_inserted(request->time_order_id));
595 (void) fr_minmax_heap_insert(worker->time_order, request);
596
597 /*
598 * Bootstrap the async state machine with the initial
599 * state of the request.
600 */
601 RDEBUG3("Time tracking started in yielded state");
602 fr_time_tracking_start(&worker->tracking, &request->async->tracking, now);
603 fr_time_tracking_yield(&request->async->tracking, now);
604 worker->num_active++;
605
606 fr_assert(!fr_heap_entry_inserted(request->runnable_id));
607 (void) fr_heap_insert(&worker->runnable, request);
608
609 if (!worker->ev_cleanup) worker_max_request_timer(worker);
610}
611
613{
614 RDEBUG3("Time tracking ended");
615 fr_time_tracking_end(&worker->predicted, &request->async->tracking, now);
616 fr_assert(worker->num_active > 0);
617 worker->num_active--;
618
619 if (fr_minmax_heap_entry_inserted(request->time_order_id)) (void) fr_minmax_heap_extract(worker->time_order, request);
620}
621
622/** Send a response packet to the network side
623 *
624 * @param[in] worker This worker.
625 * @param[in] request we're sending a reply for.
626 * @param[in] send_reply whether the network side sends a reply
627 * @param[in] now The current time
628 */
629static void worker_send_reply(fr_worker_t *worker, request_t *request, bool send_reply, fr_time_t now)
630{
631 fr_channel_data_t *reply;
632 fr_channel_t *ch;
634 size_t size = 1;
635
636 REQUEST_VERIFY(request);
637
638 /*
639 * If we're sending a reply, then it's no longer runnable.
640 */
641 fr_assert(!fr_heap_entry_inserted(request->runnable_id));
642
643 if (send_reply) {
644 size = request->async->listen->app_io->default_reply_size;
645 if (!size) size = request->async->listen->app_io->default_message_size;
646 }
647
648 /*
649 * Allocate and send the reply.
650 */
651 ch = request->async->channel;
652 fr_assert(ch != NULL);
653
654 /*
655 * If the channel has been closed, but we haven't
656 * been informed, that is extremely bad.
657 *
658 * Try to continue working... but we'll likely
659 * leak memory or SEGV soon.
660 */
661 if (!fr_cond_assert_msg(fr_channel_active(ch), "Wanted to send reply but channel has been closed")) {
662 return;
663 }
664
666 fr_assert(ms != NULL);
667
668 reply = (fr_channel_data_t *) fr_message_reserve(ms, size);
669 fr_assert(reply != NULL);
670
671 /*
672 * Encode it, if required.
673 */
674 if (send_reply) {
675 ssize_t slen = 0;
676 fr_listen_t const *listen = request->async->listen;
677
678 if (listen->app_io->encode) {
679 slen = listen->app_io->encode(listen->app_io_instance, request,
680 reply->m.data, reply->m.rb_size);
681 } else if (listen->app->encode) {
682 slen = listen->app->encode(listen->app_instance, request,
683 reply->m.data, reply->m.rb_size);
684 }
685 if (slen < 0) {
686 RPERROR("Failed encoding request");
687 *reply->m.data = 0;
688 slen = 1;
689 }
690
691 /*
692 * Shrink the buffer to the actual packet size.
693 *
694 * This will ALWAYS return the same message as we put in.
695 */
696 fr_assert((size_t) slen <= reply->m.rb_size);
697 (void) fr_message_alloc(ms, &reply->m, slen);
698 }
699
700 /*
701 * Fill in the rest of the fields in the channel message.
702 *
703 * sequence / ack will be filled in by fr_channel_send_reply()
704 */
705 reply->m.when = now;
706 reply->reply.cpu_time = worker->tracking.running_total;
707 reply->reply.processing_time = request->async->tracking.running_total;
708 reply->reply.request_time = request->async->recv_time;
709
710 reply->listen = request->async->listen;
711 reply->packet_ctx = request->async->packet_ctx;
712
713 /*
714 * Update the various timers.
715 */
716 fr_time_elapsed_update(&worker->cpu_time, now, fr_time_add(now, reply->reply.processing_time));
717 fr_time_elapsed_update(&worker->wall_clock, reply->reply.request_time, now);
718
719 RDEBUG("Finished request");
720
721 /*
722 * Send the reply, which also polls the request queue.
723 */
724 if (fr_channel_send_reply(ch, reply) < 0) {
725 /*
726 * Should only happen if the TO_REQUESTOR
727 * channel is full, or it's not yet active.
728 *
729 * Not much we can do except complain
730 * loudly and cleanup the request.
731 */
732 RPERROR("Failed sending reply to network thread");
733 }
734
735 worker->stats.out++;
736
737 fr_assert(!fr_minmax_heap_entry_inserted(request->time_order_id));
738 fr_assert(!fr_heap_entry_inserted(request->runnable_id));
739
740 fr_dlist_entry_unlink(&request->listen_entry);
741
742#ifndef NDEBUG
743 request->async->el = NULL;
744 request->async->channel = NULL;
745 request->async->packet_ctx = NULL;
746 request->async->listen = NULL;
747#endif
748}
749
750/*
751 * talloc_typed_asprintf() is horrifically slow for printing
752 * simple numbers.
753 */
754static char *itoa_internal(TALLOC_CTX *ctx, uint64_t number)
755{
756 char buffer[32];
757 char *p;
758 char const *numbers = "0123456789";
759
760 p = buffer + 30;
761 *(p--) = '\0';
762
763 while (number > 0) {
764 *(p--) = numbers[number % 10];
765 number /= 10;
766 }
767
768 if (p[1]) return talloc_strdup(ctx, p + 1);
769
770 return talloc_strdup(ctx, "0");
771}
772
773/** Initialize various request fields needed by the worker.
774 *
775 */
776static inline CC_HINT(always_inline)
778{
779 /*
780 * For internal requests request->packet
781 * and request->reply are already populated.
782 */
783 if (!request->packet) MEM(request->packet = fr_packet_alloc(request, false));
784 if (!request->reply) MEM(request->reply = fr_packet_alloc(request, false));
785
786 request->packet->timestamp = now;
787 request->async = talloc_zero(request, fr_async_t);
788 request->async->recv_time = now;
789 request->async->el = worker->el;
790 fr_dlist_entry_init(&request->async->entry);
791}
792
793static inline CC_HINT(always_inline)
795{
797 if (request->name) talloc_const_free(request->name);
798 request->name = itoa_internal(request, request->number);
799}
800
802{
803 int ret = -1;
804 request_t *request;
805 TALLOC_CTX *ctx;
806 fr_listen_t const *listen;
807
808 if (fr_minmax_heap_num_elements(worker->time_order) >= (uint32_t) worker->config.max_requests) goto nak;
809
810 ctx = request = request_alloc_external(NULL, NULL);
811 if (!request) goto nak;
812
813 worker_request_init(worker, request, now);
815
816 /*
817 * Associate our interpreter with the request
818 */
819 unlang_interpret_set(request, worker->intp);
820
821 request->packet->timestamp = cd->request.recv_time; /* Legacy - Remove once everything looks at request->async */
822
823 /*
824 * Receive a message to the worker queue, and decode it
825 * to a request.
826 */
827 fr_assert(cd->listen != NULL);
828
829 /*
830 * Update the transport-specific fields.
831 */
832 request->async->channel = cd->channel.ch;
833
834 request->async->recv_time = cd->request.recv_time;
835
836 request->async->listen = cd->listen;
837 request->async->packet_ctx = cd->packet_ctx;
838 request->async->priority = cd->priority;
839 listen = request->async->listen;
840
841 /*
842 * Now that the "request" structure has been initialized, go decode the packet.
843 *
844 * Note that this also sets the "async process" function.
845 */
846 if (listen->app->decode) {
847 ret = listen->app->decode(listen->app_instance, request, cd->m.data, cd->m.data_size);
848 } else if (listen->app_io->decode) {
849 ret = listen->app_io->decode(listen->app_io_instance, request, cd->m.data, cd->m.data_size);
850 }
851
852 if (ret < 0) {
853 talloc_free(ctx);
854nak:
855 worker_nak(worker, cd, now);
856 return;
857 }
858
859 /*
860 * Set the entry point for this virtual server.
861 */
862 if (unlang_call_push(request, cd->listen->server_cs, UNLANG_TOP_FRAME) < 0) {
863 RERROR("Protocol failed to set 'process' function");
864 worker_nak(worker, cd, now);
865 return;
866 }
867
868 /*
869 * We're done with this message.
870 */
871 fr_message_done(&cd->m);
872
873 /*
874 * Look for conflicting / duplicate packets, but only if
875 * requested to do so.
876 */
877 if (request->async->listen->track_duplicates) {
878 request_t *old;
879
880 old = fr_rb_find(worker->dedup, request);
881 if (!old) {
882 goto insert_new;
883 }
884
885 fr_assert(old->async->listen == request->async->listen);
886 fr_assert(old->async->channel == request->async->channel);
887
888 /*
889 * There's a new packet. Do we keep the old one,
890 * or the new one? This decision is made by
891 * checking the recv_time, which is a
892 * nanosecond-resolution timer. If the time is
893 * identical, then the new packet is the same as
894 * the old one.
895 *
896 * If the new packet is a duplicate of the old
897 * one, then we can just discard the new one. We
898 * have to tell the channel that we've "eaten"
899 * this reply, so the sequence number should
900 * increase.
901 *
902 * @todo - fix the channel code to do queue
903 * depth, and not sequence / ack.
904 */
905 if (fr_time_eq(old->async->recv_time, request->async->recv_time)) {
906 RWARN("Discarding duplicate of request (%"PRIu64")", old->number);
907
908 fr_channel_null_reply(request->async->channel);
909 talloc_free(request);
910
911 /*
912 * Signal there's a dup, and ignore the
913 * return code. We don't bother replying
914 * here, as an FD event or timer will
915 * wake up the request, and cause it to
916 * continue.
917 *
918 * @todo - the old request is NOT
919 * running, but is yielded. It MAY clean
920 * itself up, or do something...
921 */
923 worker->stats.dup++;
924 return;
925 }
926
927 /*
928 * Stop the old request, and decrement the number
929 * of active requests.
930 */
931 RWARN("Got conflicting packet for request (%" PRIu64 "), telling old request to stop", old->number);
932
934 worker->stats.dropped++;
935
936 insert_new:
937 (void) fr_rb_insert(worker->dedup, request);
938 }
939
940 worker_request_time_tracking_start(worker, request, now);
941
942 {
944
945 wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = listen });
946 if (!wl) {
947 MEM(wl = talloc_zero(worker, fr_worker_listen_t));
948 fr_dlist_init(&wl->dlist, request_t, listen_entry);
949 wl->listener = listen;
950
951 (void) fr_rb_insert(worker->listeners, wl);
952 }
953
954 fr_dlist_insert_tail(&wl->dlist, request);
955 }
956}
957
958/**
959 * Track a request_t in the "runnable" heap.
960 * Higher priorities take precedence, followed by lower sequence numbers
961 */
962static int8_t worker_runnable_cmp(void const *one, void const *two)
963{
964 request_t const *a = one, *b = two;
965 int ret;
966
967 ret = CMP(b->async->priority, a->async->priority);
968 if (ret != 0) return ret;
969
970 ret = CMP(a->async->sequence, b->async->sequence);
971 if (ret != 0) return ret;
972
973 return fr_time_cmp(a->async->recv_time, b->async->recv_time);
974}
975
976/**
977 * Track a request_t in the "time_order" heap.
978 */
979static int8_t worker_time_order_cmp(void const *one, void const *two)
980{
981 request_t const *a = one, *b = two;
982
983 return fr_time_cmp(a->async->recv_time, b->async->recv_time);
984}
985
986/**
987 * Track a request_t in the "dedup" tree
988 */
989static int8_t worker_dedup_cmp(void const *one, void const *two)
990{
991 int ret;
992 request_t const *a = one, *b = two;
993
994 ret = CMP(a->async->listen, b->async->listen);
995 if (ret) return ret;
996
997 return CMP(a->async->packet_ctx, b->async->packet_ctx);
998}
999
1000/** Destroy a worker
1001 *
1002 * The input channels are signaled, and local messages are cleaned up.
1003 *
1004 * This should be called to _EXPLICITLY_ destroy a worker, when some fatal
1005 * error has occurred on the worker side, and we need to destroy it.
1006 *
1007 * We signal all pending requests in the backlog to stop, and tell the
1008 * network side that it should not send us any more requests.
1009 *
1010 * @param[in] worker the worker to destroy.
1011 */
1013{
1014 int i, count;
1015 request_t *request;
1016
1017// WORKER_VERIFY;
1018
1019 /*
1020 * Stop any new requests running with this interpreter
1021 */
1023
1024 /*
1025 * Destroy all of the active requests. These are ones
1026 * which are still waiting for timers or file descriptor
1027 * events.
1028 */
1029 count = 0;
1030 while ((request = fr_minmax_heap_min_peek(worker->time_order)) != NULL) {
1031 if (count < 10) {
1032 DEBUG("Worker is exiting - telling request %s to stop", request->name);
1033 count++;
1034 }
1035 worker_stop_request(&request);
1036 }
1038
1039 /*
1040 * Signal the channels that we're closing.
1041 *
1042 * The other end owns the channel, and will take care of
1043 * popping messages in the TO_RESPONDER queue, and marking
1044 * them FR_MESSAGE_DONE. It will ignore the messages in
1045 * the TO_REQUESTOR queue, as we own those. They will be
1046 * automatically freed when our talloc context is freed.
1047 */
1048 for (i = 0; i < worker->config.max_channels; i++) {
1049 if (!worker->channel[i].ch) continue;
1050
1051 worker_requests_cancel(&worker->channel[i]);
1052
1054 "Pending messages in channel after cancelling request");
1055
1057 }
1058
1059 talloc_free(worker);
1060}
1061
1062/** Internal request (i.e. one generated by the interpreter) is now complete
1063 *
1064 */
1065static void _worker_request_internal_init(request_t *request, void *uctx)
1066{
1067 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1068 fr_time_t now = fr_time();
1069
1070 worker_request_init(worker, request, now);
1071
1072 /*
1073 * Requests generated by the interpreter
1074 * are always marked up as internal.
1075 */
1077 worker_request_time_tracking_start(worker, request, now);
1078}
1079
1080
1081/** External request is now complete
1082 *
1083 */
1084static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1085{
1086 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1087 fr_time_t now = fr_time();
1088
1089 /*
1090 * All external requests MUST have a listener.
1091 */
1093 fr_assert(request->async->listen != NULL);
1094
1095 /*
1096 * Only real packets are in the dedup tree. And even
1097 * then, only some of the time.
1098 */
1099 if (request->async->listen->track_duplicates) {
1100 (void) fr_rb_delete(worker->dedup, request);
1101 }
1102
1103 /*
1104 * If we're running a real request, then the final
1105 * indentation MUST be zero. Otherwise we skipped
1106 * something!
1107 *
1108 * Also check that the request is NOT marked as
1109 * "yielded", but is in fact done.
1110 *
1111 * @todo - check that the stack is at frame 0, otherwise
1112 * more things have gone wrong.
1113 */
1114 fr_assert_msg(request_is_internal(request) || request_is_detached(request) || (request->log.indent.unlang == 0),
1115 "Request %s bad log indentation - expected 0 got %u", request->name, request->log.indent.unlang);
1117 "Request %s is marked as yielded at end of processing", request->name);
1119 "Request %s stack depth %u > 0", request->name, unlang_interpret_stack_depth(request));
1120 RDEBUG("Done request");
1121
1122 /*
1123 * The request is done. Track that.
1124 */
1125 worker_request_time_tracking_end(worker, request, now);
1126
1127 /*
1128 * Remove it from the list of requests associated with this channel.
1129 */
1130 if (fr_dlist_entry_in_list(&request->async->entry)) {
1131 fr_dlist_entry_unlink(&request->async->entry);
1132 }
1133
1134 /*
1135 * These conditions are true when the server is
1136 * exiting and we're stopping all the requests.
1137 *
1138 * This should never happen otherwise.
1139 */
1140 if (unlikely((request->master_state == REQUEST_STOP_PROCESSING) &&
1141 !fr_channel_active(request->async->channel))) {
1142 talloc_free(request);
1143 return;
1144 }
1145
1146 worker_send_reply(worker, request, request->master_state != REQUEST_STOP_PROCESSING, now);
1147 talloc_free(request);
1148}
1149
1150/** Internal request (i.e. one generated by the interpreter) is now complete
1151 *
1152 * Whatever generated the request is now responsible for freeing it.
1153 */
1154static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1155{
1156 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1157
1158 worker_request_time_tracking_end(worker, request, fr_time());
1159
1160 fr_assert(!fr_heap_entry_inserted(request->runnable_id));
1161 fr_assert(!fr_minmax_heap_entry_inserted(request->time_order_id));
1162 fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1163}
1164
1165/** Detached request (i.e. one generated by the interpreter with no parent) is now complete
1166 *
1167 * As the request has no parent, then there's nothing to free it
1168 * so we have to.
1169 */
1170static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
1171{
1172 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1173
1174 /*
1175 * No time tracking for detached requests
1176 * so we don't need to call
1177 * worker_request_time_tracking_end.
1178 */
1179 fr_assert(!fr_heap_entry_inserted(request->runnable_id));
1180
1181 /*
1182 * Normally worker_request_time_tracking_end
1183 * would remove the request from the time
1184 * order heap, but we need to do that for
1185 * detached requests.
1186 */
1187 (void)fr_minmax_heap_extract(worker->time_order, request);
1188
1189 fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
1190
1191 /*
1192 * Detached requests have to be freed by us
1193 * as nothing else can free them.
1194 *
1195 * All other requests must be freed by the
1196 * code which allocated them.
1197 */
1198 talloc_free(request);
1199}
1200
1201
1202/** Make us responsible for running the request
1203 *
1204 */
1205static void _worker_request_detach(request_t *request, void *uctx)
1206{
1207 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1208
1209 if (request_is_detachable(request)) {
1210 /*
1211 * End the time tracking... We don't track detached requests,
1212 * because they don't contribute for the time consumed by an
1213 * external request.
1214 */
1215 if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) {
1216 RDEBUG3("Forcing time tracking to running state, from yielded, for request detach");
1217 fr_time_tracking_resume(&request->async->tracking, fr_time());
1218 }
1219 worker_request_time_tracking_end(worker, request, fr_time());
1220
1221 if (request_detach(request) < 0) RPEDEBUG("Failed detaching request");
1222
1223 RDEBUG3("Request is detached");
1224 } else {
1225 fr_assert_msg(0, "Request is not detachable");
1226 }
1227
1228 return;
1229}
1230
1231/** This is called by the interpreter when it wants to stop a request
1232 *
1233 * The idea is to get the request into the same state it would be in
1234 * if the interpreter had just finished with it.
1235 */
1236static void _worker_request_stop(request_t *request, void *uctx)
1237{
1238 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1239
1240 RDEBUG3("Cleaning up request execution state");
1241
1242 /*
1243 * Make sure time tracking is always in a
1244 * consistent state when we mark the request
1245 * as done.
1246 */
1247 if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) {
1248 RDEBUG3("Forcing time tracking to running state, from yielded, for request stop");
1249 fr_time_tracking_resume(&request->async->tracking, fr_time());
1250 }
1251
1252 /*
1253 * If the request is in the runnable queue
1254 * yank it back out, so it's not "runnable"
1255 * when we call request done.
1256 */
1257 if (fr_heap_entry_inserted(request->runnable_id)) fr_heap_extract(&worker->runnable, request);
1258}
1259
1260/** Request is now runnable
1261 *
1262 */
1263static void _worker_request_runnable(request_t *request, void *uctx)
1264{
1265 fr_worker_t *worker = uctx;
1266
1267 RDEBUG3("Request marked as runnable");
1268 fr_heap_insert(&worker->runnable, request);
1269}
1270
1271/** Interpreter yielded request
1272 *
1273 */
1274static void _worker_request_yield(request_t *request, UNUSED void *uctx)
1275{
1276 RDEBUG3("Request yielded");
1277 fr_time_tracking_yield(&request->async->tracking, fr_time());
1278}
1279
1280/** Interpreter is starting to work on request again
1281 *
1282 */
1283static void _worker_request_resume(request_t *request, UNUSED void *uctx)
1284{
1285 RDEBUG3("Request resuming");
1286 fr_time_tracking_resume(&request->async->tracking, fr_time());
1287}
1288
1289/** Check if a request is scheduled
1290 *
1291 */
1292static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
1293{
1294 return fr_heap_entry_inserted(request->runnable_id);
1295}
1296
1297/** Run a request
1298 *
1299 * Until it either yields, or is done.
1300 *
1301 * This function is also responsible for sending replies, and
1302 * cleaning up the request.
1303 *
1304 * @param[in] worker the worker
1305 * @param[in] start the current time
1306 */
1307static inline CC_HINT(always_inline) void worker_run_request(fr_worker_t *worker, fr_time_t start)
1308{
1309 request_t *request;
1310 fr_time_t now;
1311
1313
1314 now = start;
1315
1316 /*
1317 * Busy-loop running requests for 1ms. We still poll the
1318 * event loop 1000 times a second, OR when there's no
1319 * more work to do. This allows us to make progress with
1320 * ongoing requests, at the expense of sometimes ignoring
1321 * new ones.
1322 */
1323 while (fr_time_delta_lt(fr_time_sub(now, start), fr_time_delta_from_msec(1)) &&
1324 ((request = fr_heap_pop(&worker->runnable)) != NULL)) {
1325
1326 REQUEST_VERIFY(request);
1327 fr_assert(!fr_heap_entry_inserted(request->runnable_id));
1328
1329 /*
1330 * For real requests, if the channel is gone,
1331 * just stop the request and free it.
1332 */
1333 if (request->async->channel && !fr_channel_active(request->async->channel)) {
1334 worker_stop_request(&request);
1335 return;
1336 }
1337
1338 (void)unlang_interpret(request);
1339
1340 now = fr_time();
1341 }
1342}
1343
1344/** Create a worker
1345 *
1346 * @param[in] ctx the talloc context
1347 * @param[in] name the name of this worker
1348 * @param[in] el the event list
1349 * @param[in] logger the destination for all logging messages
1350 * @param[in] lvl log level
1351 * @param[in] config various configuration parameters
1352 * @return
1353 * - NULL on error
1354 * - fr_worker_t on success
1355 */
1356fr_worker_t *fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl,
1358{
1359 fr_worker_t *worker;
1360
1361 worker = talloc_zero(ctx, fr_worker_t);
1362 if (!worker) {
1363nomem:
1364 fr_strerror_const("Failed allocating memory");
1365 return NULL;
1366 }
1367
1368 worker->name = talloc_strdup(worker, name); /* thread locality */
1369
1371
1372 if (config) worker->config = *config;
1373
1374#define CHECK_CONFIG(_x, _min, _max) do { \
1375 if (!worker->config._x) worker->config._x = _min; \
1376 if (worker->config._x < _min) worker->config._x = _min; \
1377 if (worker->config._x > _max) worker->config._x = _max; \
1378 } while (0)
1379
1380#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max) do { \
1381 if (fr_time_delta_lt(worker->config._x, _min)) worker->config._x = _min; \
1382 if (fr_time_delta_gt(worker->config._x, _max)) worker->config._x = _max; \
1383 } while (0)
1384
1385 CHECK_CONFIG(max_requests,1024,(1 << 30));
1386 CHECK_CONFIG(max_channels, 64, 1024);
1387 CHECK_CONFIG(talloc_pool_size, 4096, 65536);
1388 CHECK_CONFIG(message_set_size, 1024, 8192);
1389 CHECK_CONFIG(ring_buffer_size, (1 << 17), (1 << 20));
1391
1392 worker->channel = talloc_zero_array(worker, fr_worker_channel_t, worker->config.max_channels);
1393 if (!worker->channel) {
1394 talloc_free(worker);
1395 goto nomem;
1396 }
1397
1398 worker->thread_id = pthread_self();
1399 worker->el = el;
1400 worker->log = logger;
1401 worker->lvl = lvl;
1402
1403 /*
1404 * The worker thread starts now. Manually initialize it,
1405 * because we're tracking request time, not the time that
1406 * the worker thread is running.
1407 */
1408 memset(&worker->tracking, 0, sizeof(worker->tracking));
1409
1410 worker->aq_control = fr_atomic_queue_alloc(worker, 1024);
1411 if (!worker->aq_control) {
1412 fr_strerror_const("Failed creating atomic queue");
1413 fail:
1414 talloc_free(worker);
1415 return NULL;
1416 }
1417
1418 worker->control = fr_control_create(worker, el, worker->aq_control);
1419 if (!worker->control) {
1420 fr_strerror_const_push("Failed creating control plane");
1421 goto fail;
1422 }
1423
1425 fr_strerror_const_push("Failed adding control channel");
1426 goto fail;
1427 }
1428
1430 fr_strerror_const_push("Failed adding callback for listeners");
1431 goto fail;
1432 }
1433
1434 worker->runnable = fr_heap_talloc_alloc(worker, worker_runnable_cmp, request_t, runnable_id, 0);
1435 if (!worker->runnable) {
1436 fr_strerror_const("Failed creating runnable heap");
1437 goto fail;
1438 }
1439
1440 worker->time_order = fr_minmax_heap_talloc_alloc(worker, worker_time_order_cmp, request_t, time_order_id, 0);
1441 if (!worker->time_order) {
1442 fr_strerror_const("Failed creating time_order heap");
1443 goto fail;
1444 }
1445
1446 worker->dedup = fr_rb_inline_talloc_alloc(worker, request_t, dedup_node, worker_dedup_cmp, NULL);
1447 if (!worker->dedup) {
1448 fr_strerror_const("Failed creating de_dup tree");
1449 goto fail;
1450 }
1451
1453 if (!worker->listeners) {
1454 fr_strerror_const("Failed creating listener tree");
1455 goto fail;
1456 }
1457
1458 worker->intp = unlang_interpret_init(worker, el,
1460 .init_internal = _worker_request_internal_init,
1461
1462 .done_external = _worker_request_done_external,
1463 .done_internal = _worker_request_done_internal,
1464 .done_detached = _worker_request_done_detached,
1465
1466 .detach = _worker_request_detach,
1467 .stop = _worker_request_stop,
1468 .yield = _worker_request_yield,
1469 .resume = _worker_request_resume,
1470 .mark_runnable = _worker_request_runnable,
1471
1472 .scheduled = _worker_request_scheduled
1473 },
1474 worker);
1475 if (!worker->intp){
1476 fr_strerror_const("Failed initialising interpreter");
1477 goto fail;
1478 }
1480
1481 return worker;
1482}
1483
1484
1485/** The main loop and entry point of the stand-alone worker thread.
1486 *
1487 * Where there is only one thread, the event loop runs fr_worker_pre_event() and fr_worker_post_event()
1488 * instead, And then fr_worker_post_event() takes care of calling worker_run_request() to actually run the
1489 * request.
1490 *
1491 * @param[in] worker the worker data structure to manage
1492 */
1494{
1496
1497 while (true) {
1498 bool wait_for_event;
1499 int num_events;
1500
1502
1503 /*
1504 * There are runnable requests. We still service
1505 * the event loop, but we don't wait for events.
1506 */
1507 wait_for_event = (fr_heap_num_elements(worker->runnable) == 0);
1508 if (wait_for_event) {
1509 if (worker->exiting && (fr_minmax_heap_num_elements(worker->time_order) == 0)) break;
1510
1511 DEBUG4("Ready to process requests");
1512 }
1513
1514 /*
1515 * Check the event list. If there's an error
1516 * (e.g. exit), we stop looping and clean up.
1517 */
1518 DEBUG4("Gathering events - %s", wait_for_event ? "will wait" : "Will not wait");
1519 num_events = fr_event_corral(worker->el, fr_time(), wait_for_event);
1520 if (num_events < 0) {
1521 if (fr_event_loop_exiting(worker->el)) {
1522 DEBUG4("Event loop exiting");
1523 break;
1524 }
1525
1526 PERROR("Failed retrieving events");
1527 break;
1528 }
1529
1530 DEBUG4("%u event(s) pending", num_events);
1531
1532 /*
1533 * Service outstanding events.
1534 */
1535 if (num_events > 0) {
1536 DEBUG4("Servicing event(s)");
1537 fr_event_service(worker->el);
1538 }
1539
1540 /*
1541 * Run any outstanding requests.
1542 */
1543 worker_run_request(worker, fr_time());
1544 }
1545}
1546
1547/** Pre-event handler
1548 *
1549 * This should be run ONLY in single-threaded mode!
1550 */
1552{
1553 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1554 request_t *request;
1555
1556 request = fr_heap_peek(worker->runnable);
1557 if (!request) return 0;
1558
1559 /*
1560 * There's work to do. Tell the event handler to poll
1561 * for IO / timers, but also immediately return to the
1562 * calling function, which has more work to do.
1563 */
1564 return 1;
1565}
1566
1567
1568/** Post-event handler
1569 *
1570 * This should be run ONLY in single-threaded mode!
1571 */
1573{
1574 fr_worker_t *worker = talloc_get_type_abort(uctx, fr_worker_t);
1575
1576 worker_run_request(worker, fr_time()); /* Event loop time can be too old, and trigger asserts */
1577}
1578
1579/** Print debug information about the worker structure
1580 *
1581 * @param[in] worker the worker
1582 * @param[in] fp the file where the debug output is printed.
1583 */
1584void fr_worker_debug(fr_worker_t *worker, FILE *fp)
1585{
1587
1588 fprintf(fp, "\tnum_channels = %d\n", worker->num_channels);
1589 fprintf(fp, "\tstats.in = %" PRIu64 "\n", worker->stats.in);
1590
1591 fprintf(fp, "\tcalculated (predicted) total CPU time = %" PRIu64 "\n",
1592 fr_time_delta_unwrap(worker->predicted) * worker->stats.in);
1593 fprintf(fp, "\tcalculated (counted) per request time = %" PRIu64 "\n",
1595
1596 fr_time_tracking_debug(&worker->tracking, fp);
1597
1598}
1599
1600/** Create a channel to the worker
1601 *
1602 * Called by the master (i.e. network) thread when it needs to create
1603 * a new channel to a particuler worker.
1604 *
1605 * @param[in] worker the worker
1606 * @param[in] master the control plane of the master
1607 * @param[in] ctx the context in which the channel will be created
1608 */
1610{
1611 fr_channel_t *ch;
1612 pthread_t id;
1613 bool same;
1614
1616
1617 id = pthread_self();
1618 same = (pthread_equal(id, worker->thread_id) != 0);
1619
1620 ch = fr_channel_create(ctx, master, worker->control, same);
1621 if (!ch) return NULL;
1622
1624
1625 /*
1626 * Tell the worker about the channel
1627 */
1628 if (fr_channel_signal_open(ch) < 0) {
1629 talloc_free(ch);
1630 return NULL;
1631 }
1632
1633 return ch;
1634}
1635
1637{
1639
1640 /*
1641 * Skip a bunch of work if we're already in the worker thread.
1642 */
1643 if (is_worker_thread(worker)) {
1644 return fr_worker_listen_cancel_self(worker, li);
1645 }
1646
1648 if (!rb) return -1;
1649
1650 return fr_control_message_send(worker->control, rb, FR_CONTROL_ID_LISTEN, &li, sizeof(li));
1651}
1652
1653#ifdef WITH_VERIFY_PTR
1654/** Verify the worker data structures.
1655 *
1656 * @param[in] worker the worker
1657 */
1658static void worker_verify(fr_worker_t *worker)
1659{
1660 int i;
1661
1662 (void) talloc_get_type_abort(worker, fr_worker_t);
1663 fr_atomic_queue_verify(worker->aq_control);
1664
1665 fr_assert(worker->control != NULL);
1666 (void) talloc_get_type_abort(worker->control, fr_control_t);
1667
1668 fr_assert(worker->el != NULL);
1669 (void) talloc_get_type_abort(worker->el, fr_event_list_t);
1670
1671 fr_assert(worker->runnable != NULL);
1672 (void) talloc_get_type_abort(worker->runnable, fr_heap_t);
1673
1674 fr_assert(worker->dedup != NULL);
1675 (void) talloc_get_type_abort(worker->dedup, fr_rb_tree_t);
1676
1677 for (i = 0; i < worker->config.max_channels; i++) {
1678 if (!worker->channel[i].ch) continue;
1679
1680 (void) talloc_get_type_abort(worker->channel[i].ch, fr_channel_t);
1681 }
1682}
1683#endif
1684
1685int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
1686{
1687 if (num < 0) return -1;
1688 if (num == 0) return 0;
1689
1690 stats[0] = worker->stats.in;
1691 if (num >= 2) stats[1] = worker->stats.out;
1692 if (num >= 3) stats[2] = worker->stats.dup;
1693 if (num >= 4) stats[3] = worker->stats.dropped;
1694 if (num >= 5) stats[4] = worker->num_naks;
1695 if (num >= 6) stats[5] = worker->num_active;
1696
1697 if (num <= 6) return num;
1698
1699 return 6;
1700}
1701
1702static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
1703{
1704 fr_worker_t const *worker = ctx;
1705 fr_time_delta_t when;
1706
1707 if ((info->argc == 0) || (strcmp(info->argv[0], "count") == 0)) {
1708 fprintf(fp, "count.in\t\t\t%" PRIu64 "\n", worker->stats.in);
1709 fprintf(fp, "count.out\t\t\t%" PRIu64 "\n", worker->stats.out);
1710 fprintf(fp, "count.dup\t\t\t%" PRIu64 "\n", worker->stats.dup);
1711 fprintf(fp, "count.dropped\t\t\t%" PRIu64 "\n", worker->stats.dropped);
1712 fprintf(fp, "count.naks\t\t\t%" PRIu64 "\n", worker->num_naks);
1713 fprintf(fp, "count.active\t\t\t%" PRIu64 "\n", worker->num_active);
1714 fprintf(fp, "count.runnable\t\t\t%u\n", fr_heap_num_elements(worker->runnable));
1715 }
1716
1717 if ((info->argc == 0) || (strcmp(info->argv[0], "cpu") == 0)) {
1718 when = worker->predicted;
1719 fprintf(fp, "cpu.request_time_rtt\t\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1720
1721 when = worker->tracking.running_total;
1722 if (fr_time_delta_ispos(when)) when = fr_time_delta_div(when, fr_time_delta_wrap(worker->stats.in - worker->stats.dropped));
1723 fprintf(fp, "cpu.average_request_time\t%.9f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1724
1725 when = worker->tracking.running_total;
1726 fprintf(fp, "cpu.used\t\t\t%.6f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1727
1728 when = worker->tracking.waiting_total;
1729 fprintf(fp, "cpu.waiting\t\t\t%.3f\n", fr_time_delta_unwrap(when) / (double)NSEC);
1730
1731 fr_time_elapsed_fprint(fp, &worker->cpu_time, "cpu.requests", 4);
1732 fr_time_elapsed_fprint(fp, &worker->wall_clock, "time.requests", 4);
1733 }
1734
1735 return 0;
1736}
1737
1739 {
1740 .parent = "stats",
1741 .name = "worker",
1742 .help = "Statistics for workers threads.",
1743 .read_only = true
1744 },
1745
1746 {
1747 .parent = "stats worker",
1748 .add_name = true,
1749 .name = "self",
1750 .syntax = "[(count|cpu)]",
1751 .func = cmd_stats_worker,
1752 .help = "Show statistics for a specific worker thread.",
1753 .read_only = true
1754 },
1755
1757};
static int const char char buffer[256]
Definition acutest.h:576
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
Definition app_io.h:55
size_t default_reply_size
same for replies
Definition app_io.h:40
size_t default_message_size
Usually maximum message size.
Definition app_io.h:39
fr_io_nak_t nak
Function to send a NAK.
Definition app_io.h:62
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
Definition app_io.h:54
fr_io_decode_t decode
Translate raw bytes into fr_pair_ts and metadata.
Definition application.h:80
fr_io_encode_t encode
Pack fr_pair_ts back into a byte array.
Definition application.h:85
#define fr_atexit_thread_local(_name, _free, _uctx)
Definition atexit.h:221
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Structure to hold the atomic queue.
#define atomic_uint64_t
#define RCSID(id)
Definition build.h:483
#define NDEBUG_UNUSED
Definition build.h:326
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:112
#define unlikely(_x)
Definition build.h:381
#define UNUSED
Definition build.h:315
unlang_action_t unlang_call_push(request_t *request, CONF_SECTION *server_cs, bool top_frame)
Push a call frame onto the stack.
Definition call.c:147
fr_table_num_sorted_t const channel_signals[]
Definition channel.c:153
fr_channel_t * fr_channel_create(TALLOC_CTX *ctx, fr_control_t *requestor, fr_control_t *responder, bool same)
Create a new channel.
Definition channel.c:183
fr_channel_event_t fr_channel_service_message(fr_time_t when, fr_channel_t **p_channel, void const *data, size_t data_size)
Service a control-plane message.
Definition channel.c:685
int fr_channel_set_recv_request(fr_channel_t *ch, void *uctx, fr_channel_recv_callback_t recv_request)
Definition channel.c:938
void * fr_channel_responder_uctx_get(fr_channel_t *ch)
Get responder-specific data from a channel.
Definition channel.c:897
bool fr_channel_recv_request(fr_channel_t *ch)
Receive a request message from the channel.
Definition channel.c:472
int fr_channel_null_reply(fr_channel_t *ch)
Don't send a reply message into the channel.
Definition channel.c:624
void fr_channel_responder_uctx_add(fr_channel_t *ch, void *uctx)
Add responder-specific data to a channel.
Definition channel.c:885
int fr_channel_send_reply(fr_channel_t *ch, fr_channel_data_t *cd)
Send a reply message into the channel.
Definition channel.c:511
bool fr_channel_active(fr_channel_t *ch)
Check if a channel is active.
Definition channel.c:812
int fr_channel_responder_ack_close(fr_channel_t *ch)
Acknowledge that the channel is closing.
Definition channel.c:854
int fr_channel_signal_open(fr_channel_t *ch)
Send a channel to a responder.
Definition channel.c:952
A full channel, which consists of two ends.
Definition channel.c:144
fr_message_t m
the message header
Definition channel.h:105
fr_channel_event_t
Definition channel.h:67
@ FR_CHANNEL_NOOP
Definition channel.h:74
@ FR_CHANNEL_EMPTY
Definition channel.h:75
@ FR_CHANNEL_CLOSE
Definition channel.h:72
@ FR_CHANNEL_ERROR
Definition channel.h:68
@ FR_CHANNEL_DATA_READY_REQUESTOR
Definition channel.h:70
@ FR_CHANNEL_OPEN
Definition channel.h:71
@ FR_CHANNEL_DATA_READY_RESPONDER
Definition channel.h:69
void * packet_ctx
Packet specific context for holding client information, and other proto_* specific information that n...
Definition channel.h:142
fr_listen_t * listen
for tracking packet transport, etc.
Definition channel.h:146
uint32_t priority
Priority of this packet.
Definition channel.h:140
Channel information which is added to a message.
Definition channel.h:104
int argc
current argument count
Definition command.h:39
char const * parent
e.g. "show module"
Definition command.h:52
#define CMD_TABLE_END
Definition command.h:62
char const ** argv
text version of commands
Definition command.h:42
#define FR_CONTROL_ID_LISTEN_DEAD
Definition control.h:61
#define FR_CONTROL_ID_CHANNEL
Definition control.h:56
#define FR_CONTROL_ID_LISTEN
Definition control.h:57
#define FR_CONTROL_MAX_SIZE
Definition control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition control.h:50
static fr_ring_buffer_t * rb
#define fr_cond_assert(_x)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:139
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:210
#define fr_cond_assert_msg(_x, _fmt,...)
Calls panic_action ifndef NDEBUG, else logs error and evaluates to value of _x.
Definition debug.h:156
#define MEM(x)
Definition debug.h:36
#define ERROR(fmt,...)
Definition dhcpclient.c:41
#define DEBUG(fmt,...)
Definition dhcpclient.c:39
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:260
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition dlist.h:486
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition dlist.h:163
static void fr_dlist_entry_unlink(fr_dlist_t *entry)
Remove an item from the dlist when we don't have access to the head.
Definition dlist.h:146
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:939
static void * fr_dlist_pop_head(fr_dlist_head_t *list_head)
Remove the head item in a list.
Definition dlist.h:672
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:378
static void fr_dlist_entry_init(fr_dlist_t *entry)
Initialise a linked list without metadata.
Definition dlist.h:138
Head of a doubly linked list.
Definition dlist.h:51
#define fr_event_timer_at(...)
Definition event.h:250
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
Definition heap.c:322
int fr_heap_extract(fr_heap_t **hp, void *data)
Remove a node from the heap.
Definition heap.c:239
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition heap.h:136
static bool fr_heap_entry_inserted(fr_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
Definition heap.h:124
static unsigned int fr_heap_num_elements(fr_heap_t *h)
Return the number of elements in the heap.
Definition heap.h:179
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition heap.h:115
The main heap structure.
Definition heap.h:66
rlm_rcode_t unlang_interpret(request_t *request)
Run the interpreter for a current request.
Definition interpret.c:770
void unlang_interpret_set(request_t *request, unlang_interpret_t *intp)
Set a specific interpreter for a request.
Definition interpret.c:1745
int unlang_interpret_stack_depth(request_t *request)
Return the depth of the request's stack.
Definition interpret.c:1275
void unlang_interpret_set_thread_default(unlang_interpret_t *intp)
Set the default interpreter for this thread.
Definition interpret.c:1776
unlang_interpret_t * unlang_interpret_init(TALLOC_CTX *ctx, fr_event_list_t *el, unlang_request_func_t *funcs, void *uctx)
Initialize a unlang compiler / interpret.
Definition interpret.c:1703
void unlang_interpret_signal(request_t *request, fr_signal_t action)
Send a signal (usually stop) to a request.
Definition interpret.c:1196
bool unlang_interpret_is_resumable(request_t *request)
Check if a request as resumable.
Definition interpret.c:1341
#define UNLANG_TOP_FRAME
Definition interpret.h:35
External functions provided by the owner of the interpret.
Definition interpret.h:100
uint64_t out
Definition base.h:43
uint64_t dup
Definition base.h:44
uint64_t dropped
Definition base.h:45
uint64_t in
Definition base.h:42
int fr_control_callback_add(fr_control_t *c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition control.c:417
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:343
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq)
Create a control-plane signaling path.
Definition control.c:149
The control structure.
Definition control.c:79
void const * app_instance
Definition listen.h:38
fr_app_t const * app
Definition listen.h:37
void const * app_io_instance
I/O path configuration context.
Definition listen.h:32
CONF_SECTION * server_cs
CONF_SECTION of the server.
Definition listen.h:40
fr_app_io_t const * app_io
I/O path functions.
Definition listen.h:31
Minimal data structure to use the new code.
Definition listen.h:58
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define RDEBUG3(fmt,...)
Definition log.h:343
#define RWARN(fmt,...)
Definition log.h:297
#define RERROR(fmt,...)
Definition log.h:298
#define DEBUG4(_fmt,...)
Definition log.h:267
#define RPERROR(fmt,...)
Definition log.h:302
#define RPEDEBUG(fmt,...)
Definition log.h:376
int fr_event_post_delete(fr_event_list_t *el, fr_event_timer_cb_t callback, void *uctx)
Delete a post-event callback from the event list.
Definition event.c:2335
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition event.c:2549
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition event.c:2414
talloc_free(reap)
bool fr_event_loop_exiting(fr_event_list_t *el)
Check to see whether the event loop is in the process of exiting.
Definition event.c:2755
Stores all information relating to an event list.
Definition event.c:411
A timer event.
Definition event.c:102
fr_log_lvl_t
Definition log.h:67
fr_packet_t * fr_packet_alloc(TALLOC_CTX *ctx, bool new_vector)
Allocate a new fr_packet_t.
Definition packet.c:38
unsigned int uint32_t
long int ssize_t
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:190
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:988
void fr_message_set_gc(fr_message_set_t *ms)
Garbage collect the message set.
Definition message.c:1238
fr_message_t * fr_message_reserve(fr_message_set_t *ms, size_t reserve_size)
Reserve a message.
Definition message.c:934
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size)
Create a message set.
Definition message.c:127
A Message set, composed of message headers and ring buffer data.
Definition message.c:95
size_t rb_size
cache-aligned size in the ring buffer
Definition message.h:51
fr_time_t when
when this message was sent
Definition message.h:47
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
int fr_minmax_heap_insert(fr_minmax_heap_t *hp, void *data)
void * fr_minmax_heap_min_peek(fr_minmax_heap_t *hp)
void * fr_minmax_heap_max_peek(fr_minmax_heap_t *hp)
unsigned int fr_minmax_heap_num_elements(fr_minmax_heap_t *hp)
Return the number of elements in the minmax heap.
int fr_minmax_heap_extract(fr_minmax_heap_t *hp, void *data)
static bool fr_minmax_heap_entry_inserted(fr_minmax_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
Definition minmax_heap.h:93
#define fr_minmax_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a minmax heap that verifies elements are of a specific talloc type.
Definition minmax_heap.h:85
static const conf_parser_t config[]
Definition base.c:183
#define fr_assert(_expr)
Definition rad_assert.h:38
#define REDEBUG(fmt,...)
Definition radclient.h:52
#define RDEBUG(fmt,...)
Definition radclient.h:53
#define DEBUG2(fmt,...)
Definition radclient.h:43
static void send_reply(int sockfd, fr_channel_data_t *reply)
static bool cleanup
Definition radsniff.c:60
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
bool fr_rb_delete(fr_rb_tree_t *tree, void const *data)
Remove node and free data (if a free function was specified)
Definition rb.c:741
#define fr_rb_inline_talloc_alloc(_ctx, _type, _field, _data_cmp, _data_free)
Allocs a red black that verifies elements are of a specific talloc type.
Definition rb.h:246
The main red black tree structure.
Definition rb.h:73
rlm_rcode_t
Return codes indicating the result of the module call.
Definition rcode.h:40
int request_detach(request_t *child)
Unlink a subrequest from its parent.
Definition request.c:668
#define REQUEST_VERIFY(_x)
Definition request.h:276
#define request_is_detached(_x)
Definition request.h:160
#define request_is_external(_x)
Definition request.h:158
#define request_is_internal(_x)
Definition request.h:159
#define request_is_detachable(_x)
Definition request.h:161
#define request_alloc_external(_ctx, _args)
Allocate a new external request.
Definition request.h:295
@ REQUEST_STOP_PROCESSING
Request has been signalled to stop.
Definition request.h:62
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:64
static char const * name
@ FR_SIGNAL_DUP
A duplicate request was received.
Definition signal.h:44
@ FR_SIGNAL_CANCEL
Request has been cancelled.
Definition signal.h:40
return count
Definition module.c:163
#define fr_time()
Allow us to arbitrarily manipulate time.
Definition state_test.c:8
@ memory_order_seq_cst
Definition stdatomic.h:132
#define atomic_fetch_add_explicit(object, operand, order)
Definition stdatomic.h:302
Definition log.h:96
#define fr_table_str_by_value(_table, _number, _def)
Convert an integer to a string.
Definition table.h:772
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition talloc.h:224
void fr_time_elapsed_update(fr_time_elapsed_t *elapsed, fr_time_t start, fr_time_t end)
Definition time.c:584
void fr_time_elapsed_fprint(FILE *fp, fr_time_elapsed_t const *elapsed, char const *prefix, int tab_offset)
Definition time.c:629
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition time.h:575
static int64_t fr_time_delta_unwrap(fr_time_delta_t time)
Definition time.h:154
#define fr_time_delta_lt(_a, _b)
Definition time.h:285
static fr_time_delta_t fr_time_delta_from_sec(int64_t sec)
Definition time.h:590
#define fr_time_delta_wrap(_time)
Definition time.h:152
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_eq(_a, _b)
Definition time.h:241
#define NSEC
Definition time.h:379
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
#define fr_time_gt(_a, _b)
Definition time.h:237
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
static fr_time_delta_t fr_time_delta_div(fr_time_delta_t a, fr_time_delta_t b)
Definition time.h:267
static int8_t fr_time_cmp(fr_time_t a, fr_time_t b)
Compare two fr_time_t values.
Definition time.h:916
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
@ FR_TIME_TRACKING_YIELDED
We're currently tracking time in the yielded state.
static void fr_time_tracking_yield(fr_time_tracking_t *tt, fr_time_t now)
Transition to the yielded state, recording the time we just spent running.
static void fr_time_tracking_end(fr_time_delta_t *predicted, fr_time_tracking_t *tt, fr_time_t now)
End time tracking for this entity.
fr_time_delta_t waiting_total
total time spent waiting
fr_time_delta_t running_total
total time spent running
static void fr_time_tracking_start(fr_time_tracking_t *parent, fr_time_tracking_t *tt, fr_time_t now)
Start time tracking for a tracked entity.
static void fr_time_tracking_resume(fr_time_tracking_t *tt, fr_time_t now)
Track that a request resumed.
static void fr_time_tracking_debug(fr_time_tracking_t *tt, FILE *fp)
Print debug information about the time tracking structure.
static fr_event_list_t * el
int unlang_thread_instantiate(TALLOC_CTX *ctx)
Create thread-specific data structures for unlang.
Definition compile.c:5122
void fr_perror(char const *fmt,...)
Print the current error to stderr with a prefix.
Definition strerror.c:733
#define fr_strerror_const_push(_msg)
Definition strerror.h:227
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1265
#define fr_box_time_delta(_val)
Definition value.h:343
static int8_t worker_time_order_cmp(void const *one, void const *two)
Track a request_t in the "time_order" heap.
Definition worker.c:979
fr_heap_t * runnable
current runnable requests which we've spent time processing
Definition worker.c:113
static void worker_request_time_tracking_end(fr_worker_t *worker, request_t *request, fr_time_t now)
Definition worker.c:612
static void _worker_request_yield(request_t *request, UNUSED void *uctx)
Interpreter yielded request.
Definition worker.c:1274
static void _worker_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Detached request (i.e.
Definition worker.c:1170
static void worker_channel_callback(void *ctx, void const *data, size_t data_size, fr_time_t now)
Handle a control plane message sent to the worker via a channel.
Definition worker.c:248
fr_event_list_t * el
our event list
Definition worker.c:109
int fr_worker_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
Pre-event handler.
Definition worker.c:1551
static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now)
Send a response packet to the network side.
Definition worker.c:629
fr_channel_t * fr_worker_channel_create(fr_worker_t *worker, TALLOC_CTX *ctx, fr_control_t *master)
Create a channel to the worker.
Definition worker.c:1609
fr_rb_tree_t * listeners
so we can cancel requests when a listener goes away
Definition worker.c:117
fr_channel_t * ch
Definition worker.c:82
static void worker_run_request(fr_worker_t *worker, fr_time_t start)
Run a request.
Definition worker.c:1307
static void worker_exit(fr_worker_t *worker)
Definition worker.c:226
fr_worker_t * fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config)
Create a worker.
Definition worker.c:1356
#define WORKER_VERIFY
Definition worker.c:73
bool was_sleeping
used to suppress multiple sleep signals in a row
Definition worker.c:129
static int cmd_stats_worker(FILE *fp, UNUSED FILE *fp_err, void *ctx, fr_cmd_info_t const *info)
Definition worker.c:1702
static void _worker_request_runnable(request_t *request, void *uctx)
Request is now runnable.
Definition worker.c:1263
static char * itoa_internal(TALLOC_CTX *ctx, uint64_t number)
Definition worker.c:754
static int8_t worker_dedup_cmp(void const *one, void const *two)
Track a request_t in the "dedup" tree.
Definition worker.c:989
static void worker_stop_request(request_t **request_p)
Signal the unlang interpreter that it needs to stop running the request.
Definition worker.c:507
fr_worker_channel_t * channel
list of channels
Definition worker.c:136
char const * name
name of this worker
Definition worker.c:95
uint64_t num_active
number of active requests
Definition worker.c:124
fr_event_timer_t const * ev_cleanup
timer for max_request_time
Definition worker.c:134
fr_cmd_table_t cmd_worker_table[]
Definition worker.c:1738
static _Thread_local fr_ring_buffer_t * fr_worker_rb
Definition worker.c:79
fr_minmax_heap_t * time_order
time ordered heap of requests
Definition worker.c:114
int fr_worker_stats(fr_worker_t const *worker, int num, uint64_t *stats)
Definition worker.c:1685
static void worker_request_time_tracking_start(fr_worker_t *worker, request_t *request, fr_time_t now)
Start time tracking for a request, and mark it as runnable.
Definition worker.c:587
fr_dlist_head_t dlist
Definition worker.c:88
static void _worker_request_resume(request_t *request, UNUSED void *uctx)
Interpreter is starting to work on request again.
Definition worker.c:1283
fr_rb_tree_t * dedup
de-dup tree
Definition worker.c:115
fr_atomic_queue_t * aq_control
atomic queue for control messages sent to me
Definition worker.c:105
static void worker_nak(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Send a NAK to the network thread.
Definition worker.c:414
static void worker_request_name_number(request_t *request)
Definition worker.c:794
static void _worker_request_stop(request_t *request, void *uctx)
This is called by the interpreter when it wants to stop a request.
Definition worker.c:1236
static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now)
Definition worker.c:801
fr_log_t const * log
log destination
Definition worker.c:102
fr_io_stats_t stats
input / output stats
Definition worker.c:119
#define CHECK_CONFIG(_x, _min, _max)
static void _worker_request_detach(request_t *request, void *uctx)
Make us responsible for running the request.
Definition worker.c:1205
static int _fr_worker_rb_free(void *arg)
Definition worker.c:164
fr_time_tracking_t tracking
how much time the worker has spent doing things.
Definition worker.c:127
static void _worker_request_done_external(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
External request is now complete.
Definition worker.c:1084
void fr_worker_destroy(fr_worker_t *worker)
Destroy a worker.
Definition worker.c:1012
uint64_t num_naks
number of messages which were nak'd
Definition worker.c:123
static void worker_request_init(fr_worker_t *worker, request_t *request, fr_time_t now)
Initialize various request fields needed by the worker.
Definition worker.c:777
fr_worker_config_t config
external configuration
Definition worker.c:96
fr_listen_t const * listener
incoming packets
Definition worker.c:140
unlang_interpret_t * intp
Worker's local interpreter.
Definition worker.c:98
fr_time_t checked_timeout
when we last checked the tails of the queues
Definition worker.c:132
static int fr_worker_listen_cancel_self(fr_worker_t *worker, fr_listen_t const *li)
Definition worker.c:365
bool exiting
are we exiting?
Definition worker.c:130
fr_log_lvl_t lvl
log level
Definition worker.c:103
static void worker_max_request_timer(fr_worker_t *worker)
See when we next need to service the time_order heap for "too old" packets.
Definition worker.c:564
static void worker_requests_cancel(fr_worker_channel_t *ch)
Definition worker.c:217
int num_channels
actual number of channels
Definition worker.c:111
static atomic_uint64_t request_number
Definition worker.c:77
fr_time_elapsed_t cpu_time
histogram of total CPU time per request
Definition worker.c:120
fr_rb_node_t node
in tree of listeners
Definition worker.c:142
int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
Definition worker.c:1636
void fr_worker_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Post-event handler.
Definition worker.c:1572
fr_dlist_head_t dlist
of requests associated with this listener.
Definition worker.c:148
void fr_worker(fr_worker_t *worker)
The main loop and entry point of the stand-alone worker thread.
Definition worker.c:1493
fr_time_delta_t predicted
How long we predict a request will take to execute.
Definition worker.c:126
pthread_t thread_id
my thread ID
Definition worker.c:100
static void worker_recv_request(void *ctx, fr_channel_t *ch, fr_channel_data_t *cd)
Callback which handles a message being received on the worker side.
Definition worker.c:207
fr_time_elapsed_t wall_clock
histogram of wall clock time per request
Definition worker.c:121
static int8_t worker_listener_cmp(void const *one, void const *two)
Definition worker.c:152
static void worker_max_request_time(UNUSED fr_event_list_t *el, UNUSED fr_time_t when, void *uctx)
Enforce max_request_time.
Definition worker.c:527
static bool is_worker_thread(fr_worker_t const *worker)
Definition worker.c:191
static fr_ring_buffer_t * fr_worker_rb_init(void)
Initialise thread local storage.
Definition worker.c:173
fr_control_t * control
the control plane
Definition worker.c:107
static bool _worker_request_scheduled(request_t const *request, UNUSED void *uctx)
Check if a request is scheduled.
Definition worker.c:1292
static void _worker_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Internal request (i.e.
Definition worker.c:1154
void fr_worker_debug(fr_worker_t *worker, FILE *fp)
Print debug information about the worker structure.
Definition worker.c:1584
static void _worker_request_internal_init(request_t *request, void *uctx)
Internal request (i.e.
Definition worker.c:1065
#define CACHE_LINE_SIZE
Definition worker.c:76
#define CHECK_CONFIG_TIME_DELTA(_x, _min, _max)
static int8_t worker_runnable_cmp(void const *one, void const *two)
Track a request_t in the "runnable" heap.
Definition worker.c:962
static void worker_listen_cancel_callback(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
A socket is going away, so clean up any requests which use this socket.
Definition worker.c:392
A worker which takes packets from a master, and processes them.
Definition worker.c:94
int message_set_size
default start number of messages
Definition worker.h:63
int max_requests
max requests this worker will handle
Definition worker.h:59
int max_channels
maximum number of channels
Definition worker.h:61
int ring_buffer_size
default start size for the ring buffers
Definition worker.h:64
fr_time_delta_t max_request_time
maximum time a request can be processed
Definition worker.h:66