The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
coord_pair.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: d116ad620a109a0a53505763b64f93e24cac018d $
19 *
20 * @brief Sending pair lists to and from coordination threads
21 * @file io/coord_pair.c
22 *
23 * @copyright 2026 Network RADIUS SAS (legal@networkradius.com)
24 */
25RCSID("$Id: d116ad620a109a0a53505763b64f93e24cac018d $")
26
27#include <freeradius-devel/internal/internal.h>
28#include <freeradius-devel/io/listen.h>
29#include <freeradius-devel/io/coord_pair.h>
30#include <freeradius-devel/io/coord_priv.h>
31#include <freeradius-devel/server/main_config.h>
32#include <freeradius-devel/unlang/base.h>
33
34static _Atomic(uint64_t) request_number = 0;
35
38
39static fr_dlist_head_t *coord_pair_regs = NULL;
40static module_list_t *coord_pair_modules;
41static fr_dict_attr_t const *attr_worker_id = NULL;
42
43/** Registration of pair list callbacks
44 *
45 */
46struct fr_coord_pair_reg_s {
47 fr_dlist_t entry; //!< Entry in list of pair list registrations
48 fr_dict_attr_t const *attr_packet_type; //!< Attribute containing packet type
49 fr_dict_attr_t const *root; //!< Pair list decoding root attribute
50 fr_coord_worker_pair_cb_reg_t **callbacks; //!< Array of pointers to callbacks
51 uint32_t max_packet_type; //!< Largest valid value for packet type
52 uint32_t cb_id; //!< The coordinator callback ID used for pair list handling
53 fr_time_delta_t max_request_time; //!< Maximum time for coordinator request processing.
54 fr_slab_config_t reuse; //!< Request slab allocation config.
55 virtual_server_t const *vs; //!< Virtual server containing coordinator process sections.
56};
57
59 fr_coord_t *coord; //!< Coordinator which this coord pair is attached to.
60 fr_coord_pair_reg_t *coord_pair_reg; //!< Registration details for this coord pair
61 fr_event_list_t *el; //!< Event list for interpreter.
62 unlang_interpret_t *intp; //!< Interpreter for running requests.
63 fr_heap_t *runnable; //!< Current runnable requests.
64
65 fr_timer_list_t *timeout; //!< Track when requests timeout using a dlist.
66 fr_time_delta_t predicted; //!< How long we predict a request will take to execute.
67 fr_time_tracking_t tracking; //!< How much time the coordinator has spent doing things.
68 uint64_t num_active; //!< Number of active requests.
69 request_slab_list_t *slab; //!< slab allocator for request_t
70};
71
72/** Packet context used when coordinator messages are processed through an interpreter
73 *
74 * Allows access to the coordinator structure and arbitrary data
75 * throughout the state machine.
76 */
77typedef struct {
78 fr_coord_pair_t *coord_pair; //!< Coordinator pair this packet is for.
79 void *uctx; //!< Source specific ctx.
81
82/** Conf parser to read slab settings from module config
83 */
88
89/** Remove a coord pair registration from the list when it is freed
90 */
92{
93 fr_assert(coord_pair_regs);
94
95 fr_dlist_remove(coord_pair_regs, to_free);
96
97 /* If all the registrations are gone, free the list */
98 if (fr_dlist_num_elements(coord_pair_regs) == 0) {
99 TALLOC_FREE(coord_pair_regs);
100 TALLOC_FREE(coord_pair_modules);
101 }
102 return 0;
103}
104
105/** Register a set of callbacks for pair list based coordinator messages
106 *
107 * Returns a structure to pass as uctx to fr_coord_cb_t using the
108 * macro FR_COORD_PAIR_CB_CTX_SET.
109 *
110 * @param ctx to allocate the registration under.
111 * @param reg_ctx Callback details to register.
112 */
114{
115 fr_coord_pair_reg_t *coord_pair_reg;
116 fr_coord_worker_pair_cb_reg_t *cb_reg = reg_ctx->worker_cb;
117 CONF_SECTION *cs;
118 CONF_PAIR *cp;
119
120 fr_assert(reg_ctx->root);
121
122 /* Resolve the Worker-Id attribute if not already done */
123 if (!attr_worker_id) {
124 attr_worker_id = fr_dict_attr_by_name(NULL, fr_dict_root(fr_dict_internal()), "Worker-Id");
125 if (!attr_worker_id) {
126 ERROR("Failed to resolve Worker-Id attribute");
127 return NULL;
128 }
129 }
130
131 if (!coord_pair_regs) {
132 MEM(coord_pair_regs = talloc_zero(NULL, fr_dlist_head_t));
133 fr_dlist_init(coord_pair_regs, fr_coord_pair_reg_t, entry);
134 MEM(coord_pair_modules = module_list_alloc(NULL, &module_list_type_global, "coord", true));
135 }
136
137 MEM(coord_pair_reg = talloc(ctx, fr_coord_pair_reg_t));
138 *coord_pair_reg = (fr_coord_pair_reg_t) {
139 .root = reg_ctx->root,
140 .cb_id = reg_ctx->cb_id,
141 .max_request_time = fr_time_delta_eq(reg_ctx->max_request_time, fr_time_delta_from_msec(0)) ?
143 };
144
145 while (cb_reg->callback) {
146 if (cb_reg->packet_type > coord_pair_reg->max_packet_type) {
147 coord_pair_reg->max_packet_type = cb_reg->packet_type;
148 }
149 cb_reg++;
150 }
151
152 /*
153 * A sane limit on packet type values to avoid a huge array.
154 * If larger values are needed in the future we can use a folded array.
155 */
156 fr_assert(coord_pair_reg->max_packet_type <= 256);
157
158 MEM(coord_pair_reg->callbacks = talloc_zero_array(coord_pair_reg, fr_coord_worker_pair_cb_reg_t *,
159 coord_pair_reg->max_packet_type + 1));
160
161 cb_reg = reg_ctx->worker_cb;
162 while (cb_reg->callback) {
163 coord_pair_reg->callbacks[cb_reg->packet_type] = cb_reg;
164 cb_reg++;
165 }
166
167 cs = cf_section_find(reg_ctx->cs, "reuse", NULL);
168
169 /*
170 * Create an empty "reuse" section if one is not found, so defaults are applied
171 */
172 if (!cs) {
173 cs = cf_section_alloc(reg_ctx->cs, reg_ctx->cs, "reuse", NULL);
174 }
175
177 fail:
178 talloc_free(coord_pair_reg);
179 return NULL;
180 }
181 if (cf_section_parse(coord_pair_reg, &coord_pair_reg->reuse, cs) < 0) goto fail;
182
183 /*
184 * Set defaults for request slab allocation, if not set by conf parsing
185 */
186 if (coord_pair_reg->reuse.child_pool_size == 0) coord_pair_reg->reuse.child_pool_size = REQUEST_POOL_SIZE;
187 if (coord_pair_reg->reuse.num_children == 0) coord_pair_reg->reuse.num_children = REQUEST_POOL_HEADERS;
188
189 cp = cf_pair_find(reg_ctx->cs, "virtual_server");
190 if (!cp) {
191 cf_log_err(reg_ctx->cs, "Missing virtual_server option");
192 goto fail;
193 }
194
195 coord_pair_reg->vs = virtual_server_find(cf_pair_value(cp));
196 if (!coord_pair_reg->vs) {
197 cf_log_err(cp, "Virtual server not found");
198 goto fail;
199 }
200
201 /*
202 * Validate that the virtual server uses the correct namespace.
203 */
204 if (reg_ctx->root->dict != virtual_server_dict_by_cs(virtual_server_cs(coord_pair_reg->vs))) {
205 cf_log_err(cp, "Virtual server has namespace %s, should be %s",
206 fr_dict_root(virtual_server_dict_by_cs(virtual_server_cs(coord_pair_reg->vs)))->name,
207 fr_dict_root(coord_pair_reg->root->dict)->name);
208 goto fail;
209 }
210 coord_pair_reg->attr_packet_type = virtual_server_packet_type_by_cs(virtual_server_cs(coord_pair_reg->vs));
211
212 fr_dlist_insert_tail(coord_pair_regs, coord_pair_reg);
213 talloc_set_destructor(coord_pair_reg, _coord_pair_reg_free);
214
215 return coord_pair_reg;
216}
217
218/** Return the coordinator callback ID associated with a coord_pair_reg_t
219 */
221{
222 fr_assert(coord_pair_reg);
223 return coord_pair_reg->cb_id;
224}
225
226/*
227 * The following set of callbacks for request handling are mirrors of
228 * their equivalent in worker.c
229 */
230
231/** Signal the unlang interpreter that it needs to stop running the request
232 *
233 * @param[in] request request to cancel. The request may still run to completion.
234 */
236{
238}
239
240/** Enforce max_request_time
241 *
242 * @param[in] tl the coordinators's timer list.
243 * @param[in] when the current time
244 * @param[in] uctx the request_t timing out.
245 */
247{
248 request_t *request = talloc_get_type_abort(uctx, request_t);
249
250 REDEBUG("Request has reached max_request_time - signalling it to stop");
252
253 request->rcode = RLM_MODULE_TIMEOUT;
254}
255
256/** Set, or re-set the request timer
257 *
258 * @param[in] coord_pair the coord_pair_t containing the timeout lists.
259 * @param[in] request that we're timing out.
260 * @param[in] timeout the timeout to set.
261 * @return
262 * - 0 on success.
263 * - -1 on failure.
264 */
266{
267 if (unlikely(fr_timer_in(request, coord_pair->timeout, &request->timeout, timeout,
268 true, _coord_pair_request_timeout, request) < 0)) {
269 RERROR("Failed to create request timeout timer");
270 return -1;
271 }
272
273 return 0;
274}
275
276/** Start time tracking for a request, and mark it as runnable.
277 */
279{
280 fr_assert(!fr_timer_armed(request->timeout));
281
282 if (unlikely(fr_coord_pair_request_timeout_set(coord_pair, request,
283 coord_pair->coord_pair_reg->max_request_time) < 0)) {
284 RERROR("Failed to set request timeout");
285 return -1;
286 }
287
288 RDEBUG3("Time tracking started in yielded state");
289 fr_time_tracking_start(&coord_pair->tracking, &request->async->tracking, now);
290 fr_time_tracking_yield(&request->async->tracking, now);
291 coord_pair->num_active++;
292
293 fr_assert(!fr_heap_entry_inserted(request->runnable));
294 (void) fr_heap_insert(&coord_pair->runnable, request);
295
296 return 0;
297}
298
299/** End time tracking for a request
300 */
302{
303 RDEBUG3("Time tracking ended");
304 fr_time_tracking_end(&coord_pair->predicted, &request->async->tracking, now);
305 fr_assert(coord_pair->num_active > 0);
306 coord_pair->num_active--;
307
308 TALLOC_FREE(request->timeout); /* Disarm the request timer */
309}
310
311
312static inline CC_HINT(always_inline)
313void coord_pair_request_init(fr_event_list_t *el, request_t *request, fr_time_t now, void *packet_ctx)
314{
315 if (!request->packet) MEM(request->packet = fr_packet_alloc(request, false));
316 if (!request->reply) MEM(request->reply = fr_packet_alloc(request, false));
317
318 request->packet->timestamp = now;
319 request->async = talloc_zero(request, fr_async_t);
320 request->async->recv_time = now;
321 request->async->el = el;
322 request->async->packet_ctx = packet_ctx;
323 fr_dlist_entry_init(&request->async->entry);
324}
325
326static inline CC_HINT(always_inline)
328{
329 request->number = atomic_fetch_add_explicit(&request_number, 1, memory_order_seq_cst);
330 if (request->name) talloc_const_free(request->name);
331 request->name = talloc_asprintf(request, "Coord-%"PRIu64, request->number);
332}
333
334static int _coord_pair_request_deinit( request_t *request, UNUSED void *uctx)
335{
336 return request_slab_deinit(request);
337}
338
340 fr_time_t now, void *uctx)
341{
342 request_t *request;
343 int ret;
344 fr_pair_t *vp;
345 fr_coord_packet_ctx_t *packet_ctx;
346
347 request = request_slab_reserve(coord_pair->slab);
348 if (!request) {
349 ERROR("Coordinator failed allocating new request");
350 return;
351 }
352
353 request_slab_element_set_destructor(request, _coord_pair_request_deinit, coord_pair);
354
357 .namespace = virtual_server_dict_by_cs(virtual_server_cs(coord_pair->coord_pair_reg->vs))
358 }))) {
359 ERROR("Coordinator failed initializing new request");
360 error:
361 request_slab_release(request);
362 return;
363 }
364
365 MEM(packet_ctx = talloc(request, fr_coord_packet_ctx_t));
366 *packet_ctx = (fr_coord_packet_ctx_t) {
367 .coord_pair = coord_pair,
368 .uctx = uctx
369 };
370 coord_pair_request_init(coord_pair->el, request, now, packet_ctx);
372
373 unlang_interpret_set(request, coord_pair->intp);
374
375 if (fr_pair_append_by_da(request->request_ctx, &vp, &request->request_pairs, attr_worker_id) < 0) goto error;
376 vp->vp_uint32 = worker_id;
377
378 ret = fr_internal_decode_list_dbuff(request->pair_list.request, &request->request_pairs,
379 fr_dict_root(request->proto_dict), dbuff, NULL);
380 if (ret < 0) {
381 RERROR("Failed decoding packet");
382 goto error;
383 }
384
385 vp = fr_pair_find_by_da(&request->request_pairs, NULL, coord_pair->coord_pair_reg->attr_packet_type);
386 if (!vp) {
387 RERROR("Missing %s attribute", coord_pair->coord_pair_reg->attr_packet_type->name);
388 goto error;
389 }
390
391 request->packet->code = vp->vp_uint32;
392
393 if (virtual_server_push(NULL, request, coord_pair->coord_pair_reg->vs, UNLANG_TOP_FRAME) < 0) {
394 RERROR("Protocol failed to set 'process' function");
395 goto error;
396 }
397
398 coord_pair_request_time_tracking_start(coord_pair, request, now);
399}
400
401static void _coord_pair_request_internal_init(request_t *request, void *uctx)
402{
403 fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t);
404 fr_time_t now = fr_time();
405
406 fr_assert(request->packet);
407 fr_assert(request->reply);
408
409 request->packet->timestamp = now;
410 request->async = talloc_zero(request, fr_async_t);
411 request->async->recv_time = now;
412 request->async->el = coord_pair->el;
413 fr_dlist_entry_init(&request->async->entry);
414
415 /*
416 * Requests generated by the interpreter
417 * are always marked up as internal.
418 */
420 coord_pair_request_time_tracking_start(coord_pair, request, now);
421}
422
423/** External request is now complete - will never happen with coordinators
424 *
425 */
427{
428 fr_assert(0);
429}
430
431/** Internal request (i.e. one generated by the interpreter) is now complete
432 *
433 * Whatever generated the request is now responsible for freeing it.
434 */
435static void _coord_pair_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
436{
437 fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t);
438
439 coord_pair_request_time_tracking_end(coord_pair, request, fr_time());
440
441 fr_assert(!fr_heap_entry_inserted(request->runnable));
442 fr_assert(!fr_timer_armed(request->timeout));
443 fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
444}
445
446/** Detached request (i.e. one generated by the interpreter with no parent) is now complete
447 *
448 * As the request has no parent, then there's nothing to free it
449 * so we have to.
450 */
452{
453 fr_assert(!fr_heap_entry_inserted(request->runnable));
454
455 TALLOC_FREE(request->timeout);
456
457 fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
458
459 talloc_free(request);
460}
461
462/** Make us responsible for running the request
463 *
464 */
465static void _coord_pair_request_detach(request_t *request, void *uctx)
466{
467 fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t);
468
469 RDEBUG4("%s - Request detaching", __FUNCTION__);
470
471 if (request_is_detachable(request)) {
472 /*
473 * End the time tracking... We don't track detached requests,
474 * because they don't contribute for the time consumed by an
475 * external request.
476 */
477 if (request->async->tracking.state == FR_TIME_TRACKING_YIELDED) {
478 RDEBUG3("Forcing time tracking to running state, from yielded, for request detach");
479 fr_time_tracking_resume(&request->async->tracking, fr_time());
480 }
481 coord_pair_request_time_tracking_end(coord_pair, request, fr_time());
482
483 if (request_detach(request) < 0) RPEDEBUG("Failed detaching request");
484
485 RDEBUG3("Request is detached");
486 } else {
487 fr_assert_msg(0, "Request is not detachable");
488 }
489}
490
491/** Request is now runnable
492 *
493 */
494static void _coord_pair_request_runnable(request_t *request, void *uctx)
495{
496 fr_coord_pair_t *coord_pair = uctx;
497
498 RDEBUG4("%s - Request marked as runnable", __FUNCTION__);
499 fr_heap_insert(&coord_pair->runnable, request);
500}
501
502/** Interpreter yielded request
503 *
504 */
505static void _coord_pair_request_yield(request_t *request, UNUSED void *uctx)
506{
507 RDEBUG4("%s - Request yielded", __FUNCTION__);
508 if (likely(!request_is_detached(request))) fr_time_tracking_yield(&request->async->tracking, fr_time());
509}
510
511/** Interpreter is starting to work on request again
512 *
513 */
514static void _coord_pair_request_resume(request_t *request, UNUSED void *uctx)
515{
516 RDEBUG4("%s - Request resuming", __FUNCTION__);
517 if (likely(!request_is_detached(request))) fr_time_tracking_resume(&request->async->tracking, fr_time());
518}
519
520/** Check if a request is scheduled
521 *
522 */
523static bool _coord_pair_request_scheduled(request_t const *request, UNUSED void *uctx)
524{
525 return fr_heap_entry_inserted(request->runnable);
526}
527
528/** Update a request's priority
529 *
530 */
531static void _coord_pair_request_prioritise(request_t *request, void *uctx)
532{
533 fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t);
534
535 RDEBUG4("%s - Request priority changed", __FUNCTION__);
536
537 /* Extract the request from the runnable queue _if_ it's in the runnable queue */
538 if (fr_heap_extract(&coord_pair->runnable, request) < 0) return;
539
540 /* Reinsert it to re-evaluate its new priority */
541 fr_heap_insert(&coord_pair->runnable, request);
542}
543
544/** Compare two requests by priority and sequence
545 */
546static int8_t coord_pair_runnable_cmp(void const *one, void const *two)
547{
548 request_t const *a = one, *b = two;
549 int ret;
550
551 ret = CMP(b->priority, a->priority);
552 if (ret != 0) return ret;
553
554 return CMP(a->sequence, b->sequence);
555}
556
557/** Create the coord_pair coord instance data
558 */
560 bool single_thread, void *uctx)
561{
562 fr_coord_pair_t *coord_pair;
563 fr_coord_pair_reg_t *coord_pair_reg = talloc_get_type_abort(uctx, fr_coord_pair_reg_t);
564
565 MEM(coord_pair = talloc(ctx, fr_coord_pair_t));
566 *coord_pair = (fr_coord_pair_t) {
567 .coord = coord,
568 .coord_pair_reg = coord_pair_reg,
569 .el = el
570 };
571
572 coord_pair->runnable = fr_heap_talloc_alloc(coord_pair, coord_pair_runnable_cmp, request_t, runnable, 0);
573 if (!coord_pair->runnable) {
574 fr_strerror_const("Failed creating runnable heap");
575 fail:
576 talloc_free(coord_pair);
577 return NULL;
578 }
579
580 coord_pair->timeout = fr_timer_list_ordered_alloc(coord_pair, el->tl);
581 if (!coord_pair->timeout) {
582 fr_strerror_const("Failed creating timeouts list");
583 goto fail;
584 }
585
586 coord_pair->intp = unlang_interpret_init(coord_pair, el,
588 .init_internal = _coord_pair_request_internal_init,
589
590 .done_external = _coord_pair_request_done_external,
591 .done_internal = _coord_pair_request_done_internal,
592 .done_detached = _coord_pair_request_done_detached,
593
597 .mark_runnable = _coord_pair_request_runnable,
598
601 }, coord_pair);
602
603 if (!coord_pair->intp) goto fail;
604
605 if (!(coord_pair->slab = request_slab_list_alloc(coord_pair, el, &coord_pair_reg->reuse, NULL, NULL,
606 coord_pair, true, false))) {
607 goto fail;
608 }
609
610 if (!single_thread) unlang_interpret_set_thread_default(coord_pair->intp);
611
612 return coord_pair;
613}
614
615static inline CC_HINT(always_inline) void coord_run_request(fr_coord_pair_t *coord_pair, fr_time_t start)
616{
617 request_t *request;
618 fr_time_t now;
619
620 now = start;
621
622 while (fr_time_delta_lt(fr_time_sub(now, start), fr_time_delta_from_msec(1)) &&
623 ((request = fr_heap_pop(&coord_pair->runnable)) != NULL)) {
624 REQUEST_VERIFY(request);
625 fr_assert(!fr_heap_entry_inserted(request->runnable));
626
628
629 now = fr_time();
630 }
631}
632
633/*
634 * Pre and post events used in single threaded mode
635 */
636
638{
639 fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t);
640 request_t *request;
641
642 request = fr_heap_peek(coord_pair->runnable);
643 return request ? 1 : 0;
644}
645
647{
648 fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t);
649
650 coord_run_request(coord_pair, fr_time());
651}
652
653/** Event callback in multi threaded mode
654 */
656{
657 fr_coord_pair_t *coord_pair = talloc_get_type_abort(uctx, fr_coord_pair_t);
658
659 coord_run_request(coord_pair, fr_time());
660}
661
662/** Callback run when a coordinator receives pair list data
663 *
664 * Converts the data into a request.
665 */
667{
668 fr_coord_pair_reg_t *coord_pair_reg = talloc_get_type_abort(uctx, fr_coord_pair_reg_t);
669 fr_coord_pair_t *coord_pair = talloc_get_type_abort(inst, fr_coord_pair_t);
670 coord_pair_request_bootstrap(coord_pair, worker_id, dbuff, now, coord_pair_reg);
671
672 return;
673}
674
675/** Callback run when a worker receives pair list data
676 *
677 * Finds the packet type attribute in the data and calls the callback
678 * registered against the value of that attribute.
679 *
680 * @param cw Worker which received the message.
681 * @param dbuff Data received.
682 * @param now Time the data is received.
683 * @param uctx The coord_pair registration.
684 */
686{
687 fr_coord_pair_reg_t *coord_pair_reg = talloc_get_type_abort(uctx, fr_coord_pair_reg_t);
688 fr_pair_list_t list;
689 fr_pair_t *vp;
690
691 fr_pair_list_init(&list);
692 if (fr_internal_decode_list_dbuff(NULL, &list, coord_pair_reg->root, dbuff, NULL) < 0) {
693 PERROR("Failed to decode data as pair list");
694 goto free;
695 }
696
697 vp = fr_pair_find_by_da_nested(&list, NULL, coord_pair_reg->attr_packet_type);
698
699 if (!vp) {
700 ERROR("Message received without %s", coord_pair_reg->attr_packet_type->name);
701 goto free;
702 }
703
704 if (vp->vp_uint32 > coord_pair_reg->max_packet_type || !coord_pair_reg->callbacks[vp->vp_uint32]) {
705 ERROR("Message received with invalid value %pP", vp);
706 goto free;
707 }
708
709 coord_pair_reg->callbacks[vp->vp_uint32]->callback(cw, coord_pair_reg, &list, now,
710 coord_pair_reg->callbacks[vp->vp_uint32]->uctx);
711
712free:
713 fr_pair_list_free(&list);
714}
715
716/** Send a reply list from a coordinator to a worker
717 *
718 * @param request containing the reply to send.
719 * @param worker_id to send the reply to.
720 * @return
721 * - 0 on success
722 * - -1 on failure
723 */
725{
726 fr_dbuff_t dbuff;
727 fr_dbuff_uctx_talloc_t tctx;
728 fr_coord_packet_ctx_t *packet_ctx = talloc_get_type_abort(request->async->packet_ctx, fr_coord_packet_ctx_t);
729 fr_coord_pair_reg_t *coord_pair_reg = talloc_get_type_abort(packet_ctx->uctx, fr_coord_pair_reg_t);
730 int ret;
731
732 if (fr_dbuff_init_talloc(NULL, &dbuff, &tctx, 1024, SIZE_MAX) == NULL) return -1;
733 if (fr_internal_encode_list(&dbuff, &request->reply_pairs, NULL) < 0) {
734 fr_dbuff_free_talloc(&dbuff);
735 return -1;
736 }
737
738 ret = fr_coord_to_worker_send(packet_ctx->coord_pair->coord, worker_id, coord_pair_reg->cb_id, &dbuff);
739
740 fr_dbuff_free_talloc(&dbuff);
741
742 return ret;
743}
744
745/** Send a pair list from a worker to a coordinator
746 *
747 * The pair list must include an attribute indicating the packet type
748 *
749 * @param cw The coord worker sending the data.
750 * @param list of pairs to send.
751 * @return
752 * - 0 on success
753 * - -1 on failure
754 */
756{
757 fr_dbuff_t dbuff;
758 fr_dbuff_uctx_talloc_t tctx;
759 int ret;
760
761 if (fr_dbuff_init_talloc(NULL, &dbuff, &tctx, 1024, SIZE_MAX) == NULL) return -1;
762 if (fr_internal_encode_list(&dbuff, list, NULL) < 0) {
763 fr_dbuff_free_talloc(&dbuff);
764 return -1;
765 }
766
767 ret = fr_worker_to_coord_send(cw, coord_pair_reg->cb_id, &dbuff);
768
769 fr_dbuff_free_talloc(&dbuff);
770 return ret;
771}
772
773/** Plugin creation called during coordinator creation.
774 *
775 * @param ctx to allocate the plugin in.
776 * @param el Event list for plugin to use.
777 * @param single_thread is the server in single thread mode.
778 * @param uctx configured for the callback this plugin relates to.
779 * @return
780 * - fr_coord_plugin_t on success
781 * - NULL on failure
782 */
784 bool single_thread, void *uctx)
785{
786 fr_coord_cb_inst_t *cb_inst;
787 fr_coord_pair_t *coord_pair;
788
789 MEM(cb_inst = talloc(ctx, fr_coord_cb_inst_t));
790
791 *cb_inst = (fr_coord_cb_inst_t) {
793 .event_post_cb = fr_coord_pair_post_event,
794 .event_cb = fr_coord_pair_event
795 };
796
797 coord_pair = fr_coord_pair_create(ctx, coord, el, single_thread, uctx);
798 if (!coord_pair) {
799 talloc_free(cb_inst);
800 return NULL;
801 }
802
803 cb_inst->inst_data = coord_pair;
804
805 return cb_inst;
806}
#define RCSID(id)
Definition build.h:488
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:113
#define unlikely(_x)
Definition build.h:384
#define UNUSED
Definition build.h:318
int cf_section_parse(TALLOC_CTX *ctx, void *base, CONF_SECTION *cs)
Parse a configuration section into user-supplied variables.
Definition cf_parse.c:1207
#define CONF_PARSER_TERMINATOR
Definition cf_parse.h:657
#define cf_section_rules_push(_cs, _rule)
Definition cf_parse.h:689
Defines a CONF_PAIR to C data type mapping.
Definition cf_parse.h:594
Configuration AVP similar to a fr_pair_t.
Definition cf_priv.h:72
A section grouping multiple CONF_PAIR.
Definition cf_priv.h:101
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
Definition cf_util.c:1027
CONF_PAIR * cf_pair_find(CONF_SECTION const *cs, char const *attr)
Search for a CONF_PAIR with a specific name.
Definition cf_util.c:1419
char const * cf_pair_value(CONF_PAIR const *pair)
Return the value of a CONF_PAIR.
Definition cf_util.c:1574
#define cf_log_err(_cf, _fmt,...)
Definition cf_util.h:285
#define cf_section_alloc(_ctx, _parent, _name1, _name2)
Definition cf_util.h:143
int fr_coord_to_worker_send(fr_coord_t *coord, uint32_t worker_id, uint32_t cb_id, fr_dbuff_t *dbuff)
Send generic data from a coordinator to a worker.
Definition coord.c:691
int fr_worker_to_coord_send(fr_coord_worker_t *cw, uint32_t cb_id, fr_dbuff_t *dbuff)
Send data from a worker to a coordinator.
Definition coord.c:746
fr_event_list_t * el
Coordinator event list.
Definition coord.c:49
A coordinator which receives messages from workers.
Definition coord.c:47
The worker end of worker <-> coordinator communication.
Definition coord.c:71
struct fr_coord_cb_inst_s fr_coord_cb_inst_t
Definition coord.h:38
static void coord_pair_request_time_tracking_end(fr_coord_pair_t *coord_pair, request_t *request, fr_time_t now)
End time tracking for a request.
Definition coord_pair.c:301
static int _coord_pair_reg_free(fr_coord_pair_reg_t *to_free)
Remove a coord pair registration from the list when it is freed.
Definition coord_pair.c:91
static void _coord_pair_request_resume(request_t *request, UNUSED void *uctx)
Interpreter is starting to work on request again.
Definition coord_pair.c:514
fr_timer_list_t * timeout
Track when requests timeout using a dlist.
Definition coord_pair.c:65
fr_time_tracking_t tracking
How much time the coordinator has spent doing things.
Definition coord_pair.c:67
static void _coord_pair_request_done_internal(request_t *request, UNUSED rlm_rcode_t rcode, void *uctx)
Internal request (i.e.
Definition coord_pair.c:435
fr_coord_pair_reg_t * coord_pair_reg
Registration details for this coord pair.
Definition coord_pair.c:60
void fr_coord_worker_pair_data_recv(fr_coord_worker_t *cw, fr_dbuff_t *dbuff, fr_time_t now, void *uctx)
Callback run when a worker receives pair list data.
Definition coord_pair.c:685
int fr_coord_to_worker_reply_send(request_t *request, uint32_t worker_id)
Send a reply list from a coordinator to a worker.
Definition coord_pair.c:724
static void _coord_pair_request_internal_init(request_t *request, void *uctx)
Definition coord_pair.c:401
static void _coord_pair_request_yield(request_t *request, UNUSED void *uctx)
Interpreter yielded request.
Definition coord_pair.c:505
int fr_worker_to_coord_pair_send(fr_coord_worker_t *cw, fr_coord_pair_reg_t *coord_pair_reg, fr_pair_list_t *list)
Send a pair list from a worker to a coordinator.
Definition coord_pair.c:755
static int _coord_pair_request_deinit(request_t *request, UNUSED void *uctx)
Definition coord_pair.c:334
fr_time_delta_t predicted
How long we predict a request will take to execute.
Definition coord_pair.c:66
uint32_t fr_coord_pair_reg_cb_id(fr_coord_pair_reg_t *coord_pair_reg)
Return the coordinator callback ID associated with a coord_pair_reg_t.
Definition coord_pair.c:220
void * uctx
Source specific ctx.
Definition coord_pair.c:79
static bool _coord_pair_request_scheduled(request_t const *request, UNUSED void *uctx)
Check if a request is scheduled.
Definition coord_pair.c:523
static int coord_pair_request_time_tracking_start(fr_coord_pair_t *coord_pair, request_t *request, fr_time_t now)
Start time tracking for a request, and mark it as runnable.
Definition coord_pair.c:278
fr_coord_pair_reg_t * fr_coord_pair_register(TALLOC_CTX *ctx, fr_coord_pair_reg_ctx_t *reg_ctx)
Register a set of callbacks for pair list based coordinator messages.
Definition coord_pair.c:113
fr_coord_t * coord
Coordinator which this coord pair is attached to.
Definition coord_pair.c:59
static void fr_coord_pair_event(UNUSED fr_event_list_t *el, void *uctx)
Event callback in multi threaded mode.
Definition coord_pair.c:655
static void coord_pair_request_name_number(request_t *request)
Definition coord_pair.c:327
static void _coord_pair_request_detach(request_t *request, void *uctx)
Make us responsible for running the request.
Definition coord_pair.c:465
fr_coord_pair_t * coord_pair
Coordinator pair this packet is for.
Definition coord_pair.c:78
unlang_interpret_t * intp
Interpreter for running requests.
Definition coord_pair.c:62
static int8_t coord_pair_runnable_cmp(void const *one, void const *two)
Compare two requests by priority and sequence.
Definition coord_pair.c:546
static void _coord_pair_request_prioritise(request_t *request, void *uctx)
Update a request's priority.
Definition coord_pair.c:531
fr_coord_cb_inst_t * fr_coord_pair_inst_create(TALLOC_CTX *ctx, fr_coord_t *coord, fr_event_list_t *el, bool single_thread, void *uctx)
Plugin creation called during coordinator creation.
Definition coord_pair.c:783
static int fr_coord_pair_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
Definition coord_pair.c:637
fr_heap_t * runnable
Current runnable requests.
Definition coord_pair.c:63
static int fr_coord_pair_request_timeout_set(fr_coord_pair_t *coord_pair, request_t *request, fr_time_delta_t timeout)
Set, or re-set the request timer.
Definition coord_pair.c:265
static void _coord_pair_request_done_detached(request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
Detached request (i.e.
Definition coord_pair.c:451
static void coord_pair_request_init(fr_event_list_t *el, request_t *request, fr_time_t now, void *packet_ctx)
Definition coord_pair.c:313
static void coord_pair_request_bootstrap(fr_coord_pair_t *coord_pair, uint32_t worker_id, fr_dbuff_t *dbuff, fr_time_t now, void *uctx)
Definition coord_pair.c:339
static void coord_run_request(fr_coord_pair_t *coord_pair, fr_time_t start)
Definition coord_pair.c:615
fr_event_list_t * el
Event list for interpreter.
Definition coord_pair.c:61
static void coord_pair_stop_request(request_t *request)
Signal the unlang interpreter that it needs to stop running the request.
Definition coord_pair.c:235
static const conf_parser_t request_reuse_config[]
Conf parser to read slab settings from module config.
Definition coord_pair.c:84
static void _coord_pair_request_done_external(UNUSED request_t *request, UNUSED rlm_rcode_t rcode, UNUSED void *uctx)
External request is now complete - will never happen with coordinators.
Definition coord_pair.c:426
static fr_coord_pair_t * fr_coord_pair_create(TALLOC_CTX *ctx, fr_coord_t *coord, fr_event_list_t *el, bool single_thread, void *uctx)
Create the coord_pair coord instance data.
Definition coord_pair.c:559
static void _coord_pair_request_timeout(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t when, void *uctx)
Enforce max_request_time.
Definition coord_pair.c:246
uint64_t num_active
Number of active requests.
Definition coord_pair.c:68
void fr_coord_pair_data_recv(UNUSED fr_coord_t *coord, uint32_t worker_id, fr_dbuff_t *dbuff, fr_time_t now, void *inst, void *uctx)
Callback run when a coordinator receives pair list data.
Definition coord_pair.c:666
static void fr_coord_pair_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Definition coord_pair.c:646
request_slab_list_t * slab
slab allocator for request_t
Definition coord_pair.c:69
static void _coord_pair_request_runnable(request_t *request, void *uctx)
Request is now runnable.
Definition coord_pair.c:494
Packet context used when coordinator messages are processed through an interpreter.
Definition coord_pair.c:77
struct fr_coord_pair_s fr_coord_pair_t
Definition coord_pair.h:33
fr_coord_worker_pair_cb_reg_t * worker_cb
Callbacks for coordinator -> worker pair messages.
Definition coord_pair.h:44
uint32_t packet_type
Packet type value for this callback.
Definition coord_pair.h:38
fr_time_delta_t max_request_time
Maximum time for coordinator request processing.
Definition coord_pair.h:48
fr_dict_attr_t const * root
Root attribute for decoding pair list messages.
Definition coord_pair.h:45
CONF_SECTION * cs
Module conf section.
Definition coord_pair.h:47
fr_coord_worker_pair_cb_t callback
Function to call.
Definition coord_pair.h:39
struct fr_coord_pair_reg_s fr_coord_pair_reg_t
Definition coord_pair.h:32
uint32_t cb_id
Coordinator callback id used for pair list messages.
Definition coord_pair.h:46
void * inst_data
Instance data.
Definition coord_priv.h:45
fr_event_status_cb_t event_pre_cb
Pre-event callback in single thread mode.
Definition coord_priv.h:46
static void fr_dbuff_free_talloc(fr_dbuff_t *dbuff)
Free the talloc buffer associated with a dbuff.
Definition dbuff.h:461
static fr_dbuff_t * fr_dbuff_init_talloc(TALLOC_CTX *ctx, fr_dbuff_t *dbuff, fr_dbuff_uctx_talloc_t *tctx, size_t init, size_t max)
Initialise a special dbuff which automatically extends as additional data is written.
Definition dbuff.h:419
#define fr_assert_msg(_x, _msg,...)
Calls panic_action ifndef NDEBUG, else logs error and causes the server to exit immediately with code...
Definition debug.h:212
#define MEM(x)
Definition debug.h:46
static fr_dict_attr_t const * attr_packet_type
Definition dhcpclient.c:88
#define ERROR(fmt,...)
Definition dhcpclient.c:40
fr_dict_attr_t const * fr_dict_attr_by_name(fr_dict_attr_err_t *err, fr_dict_attr_t const *parent, char const *attr))
Locate a fr_dict_attr_t by its name.
Definition dict_util.c:3528
fr_dict_attr_t const * fr_dict_root(fr_dict_t const *dict)
Return the root attribute of a dictionary.
Definition dict_util.c:2665
fr_dict_t const * fr_dict_internal(void)
Definition dict_util.c:4928
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:242
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition dlist.h:620
static bool fr_dlist_entry_in_list(fr_dlist_t const *entry)
Check if a list entry is part of a list.
Definition dlist.h:145
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:921
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:360
static void fr_dlist_entry_init(fr_dlist_t *entry)
Initialise a linked list without metadata.
Definition dlist.h:120
Head of a doubly linked list.
Definition dlist.h:51
Entry in a doubly linked list.
Definition dlist.h:41
int fr_heap_insert(fr_heap_t **hp, void *data)
Insert a new element into the heap.
Definition heap.c:146
void * fr_heap_pop(fr_heap_t **hp)
Remove a node from the heap.
Definition heap.c:325
int fr_heap_extract(fr_heap_t **hp, void *data)
Remove a node from the heap.
Definition heap.c:239
static void * fr_heap_peek(fr_heap_t *h)
Return the item from the top of the heap but don't pop it.
Definition heap.h:136
static bool fr_heap_entry_inserted(fr_heap_index_t heap_idx)
Check if an entry is inserted into a heap.
Definition heap.h:124
#define fr_heap_talloc_alloc(_ctx, _cmp, _talloc_type, _field, _init)
Creates a heap that verifies elements are of a specific talloc type.
Definition heap.h:115
The main heap structure.
Definition heap.h:66
free(array)
talloc_free(hp)
rlm_rcode_t unlang_interpret(request_t *request, bool running)
Run the interpreter for a current request.
Definition interpret.c:940
void unlang_interpret_set(request_t *request, unlang_interpret_t *intp)
Set a specific interpreter for a request.
Definition interpret.c:2034
void unlang_interpret_set_thread_default(unlang_interpret_t *intp)
Set the default interpreter for this thread.
Definition interpret.c:2065
unlang_interpret_t * unlang_interpret_init(TALLOC_CTX *ctx, fr_event_list_t *el, unlang_request_func_t *funcs, void *uctx)
Initialize a unlang compiler / interpret.
Definition interpret.c:1993
void unlang_interpret_signal(request_t *request, fr_signal_t action)
Send a signal (usually stop) to a request.
Definition interpret.c:1417
#define UNLANG_REQUEST_RESUME
Definition interpret.h:43
#define UNLANG_TOP_FRAME
Definition interpret.h:36
External functions provided by the owner of the interpret.
Definition interpret.h:110
Minimal data structure to use the new code.
Definition listen.h:63
#define PERROR(_fmt,...)
Definition log.h:228
#define RDEBUG3(fmt,...)
Definition log.h:355
#define RERROR(fmt,...)
Definition log.h:310
#define RPEDEBUG(fmt,...)
Definition log.h:388
#define RDEBUG4(fmt,...)
Definition log.h:356
#define fr_time()
Definition event.c:60
Stores all information relating to an event list.
Definition event.c:377
fr_packet_t * fr_packet_alloc(TALLOC_CTX *ctx, bool new_vector)
Allocate a new fr_packet_t.
Definition packet.c:38
main_config_t const * main_config
Main server configuration.
Definition main_config.c:58
fr_worker_config_t worker
Worker thread configuration.
Definition main_config.h:61
unsigned int uint32_t
int fr_pair_append_by_da(TALLOC_CTX *ctx, fr_pair_t **out, fr_pair_list_t *list, fr_dict_attr_t const *da)
Alloc a new fr_pair_t (and append)
Definition pair.c:1471
fr_pair_t * fr_pair_find_by_da_nested(fr_pair_list_t const *list, fr_pair_t const *prev, fr_dict_attr_t const *da)
Find a pair with a matching fr_dict_attr_t, by walking the nested fr_dict_attr_t tree.
Definition pair.c:784
fr_pair_t * fr_pair_find_by_da(fr_pair_list_t const *list, fr_pair_t const *prev, fr_dict_attr_t const *da)
Find the first pair with a matching da.
Definition pair.c:707
void fr_pair_list_init(fr_pair_list_t *list)
Initialise a pair list header.
Definition pair.c:46
ssize_t fr_internal_decode_list_dbuff(TALLOC_CTX *ctx, fr_pair_list_t *out, fr_dict_attr_t const *parent, fr_dbuff_t *dbuff, void *decode_ctx)
Retrieve all pairs from the dbuff.
Definition decode.c:314
ssize_t fr_internal_encode_list(fr_dbuff_t *dbuff, fr_pair_list_t const *list, void *encode_ctx)
Encode a list of pairs using the internal encoder.
Definition encode.c:303
#define fr_assert(_expr)
Definition rad_assert.h:37
#define REDEBUG(fmt,...)
rlm_rcode_t
Return codes indicating the result of the module call.
Definition rcode.h:44
@ RLM_MODULE_TIMEOUT
Module (or section) timed out.
Definition rcode.h:56
int request_slab_deinit(request_t *request)
Callback for slabs to deinitialise the request.
Definition request.c:383
int request_detach(request_t *child)
Unlink a subrequest from its parent.
Definition request.c:542
#define REQUEST_VERIFY(_x)
Definition request.h:309
#define request_is_detached(_x)
Definition request.h:186
#define REQUEST_POOL_HEADERS
Definition request.h:66
#define request_is_internal(_x)
Definition request.h:185
@ REQUEST_TYPE_INTERNAL
A request generated internally.
Definition request.h:179
#define request_is_detachable(_x)
Definition request.h:187
#define request_init(_ctx, _type, _args)
Definition request.h:321
#define REQUEST_POOL_SIZE
Definition request.h:79
Optional arguments for initialising requests.
Definition request.h:287
static _Thread_local int worker_id
Internal ID of the current worker thread.
Definition schedule.c:103
A list of modules.
Definition module.h:407
@ FR_SIGNAL_CANCEL
Request has been cancelled.
Definition signal.h:40
#define FR_SLAB_FUNCS(_name, _type)
Define type specific wrapper functions for slabs and slab elements.
Definition slab.h:124
#define FR_SLAB_TYPES(_name, _type)
Define type specific wrapper structs for slabs and slab elements.
Definition slab.h:75
#define FR_SLAB_CONFIG_CONF_PARSER
conf_parser_t entries to populate user configurable slab values
Definition slab.h:35
Tuneable parameters for slabs.
Definition slab.h:42
module_list_t * module_list_alloc(TALLOC_CTX *ctx, module_list_type_t const *type, char const *name, bool write_protect)
Allocate a new module list.
Definition module.c:1883
module_list_type_t const module_list_type_global
Callbacks for a global module list.
Definition module.c:533
eap_aka_sim_process_conf_t * inst
fr_pair_t * vp
@ memory_order_seq_cst
Definition stdatomic.h:132
#define atomic_fetch_add_explicit(object, operand, order)
Definition stdatomic.h:302
#define _Atomic(T)
Definition stdatomic.h:77
Stores an attribute, a value and various bits of other data.
Definition pair.h:68
static int talloc_const_free(void const *ptr)
Free const'd memory.
Definition talloc.h:253
#define talloc_asprintf
Definition talloc.h:144
static fr_time_delta_t fr_time_delta_from_msec(int64_t msec)
Definition time.h:575
#define fr_time_delta_lt(_a, _b)
Definition time.h:285
#define fr_time_delta_eq(_a, _b)
Definition time.h:287
#define fr_time_sub(_a, _b)
Subtract one time from another.
Definition time.h:229
A time delta, a difference in time measured in nanoseconds.
Definition time.h:80
"server local" time.
Definition time.h:69
@ FR_TIME_TRACKING_YIELDED
We're currently tracking time in the yielded state.
static void fr_time_tracking_yield(fr_time_tracking_t *tt, fr_time_t now)
Transition to the yielded state, recording the time we just spent running.
static void fr_time_tracking_end(fr_time_delta_t *predicted, fr_time_tracking_t *tt, fr_time_t now)
End time tracking for this entity.
static void fr_time_tracking_start(fr_time_tracking_t *parent, fr_time_tracking_t *tt, fr_time_t now)
Start time tracking for a tracked entity.
static void fr_time_tracking_resume(fr_time_tracking_t *tt, fr_time_t now)
Track that a request resumed.
fr_timer_list_t * fr_timer_list_ordered_alloc(TALLOC_CTX *ctx, fr_timer_list_t *parent)
Allocate a new sorted event timer list.
Definition timer.c:1290
An event timer list.
Definition timer.c:49
#define fr_timer_in(...)
Definition timer.h:87
static bool fr_timer_armed(fr_timer_t *ev)
Definition timer.h:120
static fr_event_list_t * el
void fr_pair_list_free(fr_pair_list_t *list)
Free memory used by a valuepair list.
#define fr_strerror_const(_msg)
Definition strerror.h:223
unlang_action_t virtual_server_push(unlang_result_t *p_result, request_t *request, virtual_server_t const *vs, bool top_frame)
Set the request processing function.
virtual_server_t const * virtual_server_find(char const *name)
Return virtual server matching the specified name.
CONF_SECTION * virtual_server_cs(virtual_server_t const *vs)
Return the configuration section for a virtual server.
fr_dict_t const * virtual_server_dict_by_cs(CONF_SECTION const *server_cs)
Return the namespace for the virtual server specified by a config section.
fr_dict_attr_t const * virtual_server_packet_type_by_cs(CONF_SECTION const *server_cs)
Return the packet type attribute for a virtual server specified by a config section.
fr_time_delta_t max_request_time
maximum time a request can be processed
Definition worker.h:76