The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
coord.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 7fc62cb27893f4f8957139302e4609b18b2565cf $
19 *
20 * @brief Coordination thread management
21 * @file io/coord.c
22 *
23 * @copyright 2026 Network RADIUS SAS (legal@networkradius.com)
24 */
25RCSID("$Id: 7fc62cb27893f4f8957139302e4609b18b2565cf $")
26
27#include <freeradius-devel/io/listen.h>
28#include <freeradius-devel/io/schedule.h>
29#include <freeradius-devel/io/thread.h>
30#include <freeradius-devel/io/coord_priv.h>
31#include <freeradius-devel/unlang/base.h>
32#include <freeradius-devel/util/syserror.h>
33
34#include <stdalign.h>
35
36#define FR_CONTROL_ID_COORD_WORKER_ATTACH (1) //!< Message sent from worker to attach to a coordinator
37#define FR_CONTROL_ID_COORD_WORKER_DETACH (2) //!< Message sent from worker to detach from a coordinator
38#define FR_CONTROL_ID_COORD_WORKER_ACK (3) //!< Message sent to worker to acknowledge attach / detach
39#define FR_CONTROL_ID_COORD_DATA (4) //!< Worker <-> coordinator message to pass data to a callback
40
44
45/** A coordinator which receives messages from workers
46 */
47struct fr_coord_s {
48 fr_coord_reg_t *coord_reg; //!< Coordinator registration details.
49 fr_event_list_t *el; //!< Coordinator event list.
50 fr_rb_node_t node; //!< Entry in the tree of coordinators.
51 fr_coord_cb_reg_t *callbacks; //!< Array of callbacks for worker -> coordinator messages.
52 uint32_t num_callbacks; //!< Number of callbacks defined.
53 fr_coord_cb_inst_t **cb_inst; //!< Array of callback instance specific data.
54
55 uint32_t max_workers; //!< Maximum number of workers we expect.
56 uint32_t num_workers; //!< How many workers are attached.
57
58 fr_control_t *coord_recv_control; //!< Control plane for worker -> coordinator messages.
59 fr_atomic_queue_t *coord_recv_aq; //!< Atomic queue for worker -> coordinator
60 fr_ring_buffer_t **coord_send_rb; //!< Ring buffers for coordinator -> worker control messages.
61 fr_control_t **coord_send_control; //!< Control planes for coordinator -> worker messages.
62 fr_message_set_t **coord_send_ms; //!< Message sets for coordinator -> worker data.
63 fr_atomic_queue_t **coord_send_aq; //!< Atomic queues for coordinator -> worker data.
64
65 bool exiting; //!< Is this coordinator shutting down.
66 bool single_thread; //!< Are we in single thread mode.
67};
68
69/** The worker end of worker <-> coordinator communication.
70 */
72 fr_coord_t *coord; //!< Coordinator this worker is related to
73 fr_ring_buffer_t *worker_send_rb; //!< Ring buffer for worker -> coordinator control plane
74 fr_message_set_t *worker_send_ms; //!< Message set for worker -> coordinator messages
75 fr_control_t *worker_recv_control; //!< Coordinator -> worker control plane
76 fr_atomic_queue_t *worker_recv_aq; //!< Atomic queue for coordinator -> worker messages
77 fr_coord_worker_cb_reg_t *callbacks; //!< Callbacks for coordinator -> worker messages
78 uint32_t num_callbacks; //!< Number of callbacks registered.
79};
80
81/** A coordinator registration
82 */
84 char const *name; //!< Name for debugging.
85 fr_dlist_t entry; //!< Entry in list of registrations.
86 fr_coord_cb_reg_t *coord_cb; //!< Callbacks for worker -> coordinator messages.
87 fr_coord_worker_cb_reg_t *worker_cb; //!< Callbacks for coordinator -> worker messages.
88 size_t worker_send_size; //!< Initial size for worker -> coordinator ring buffer.
89 size_t coord_send_size; //!< Initial size for coordinator -> worker ring buffer.
90};
91
92/** Scheduler specific information for coordinator threads
93 */
94typedef struct {
95 fr_thread_t thread; //!< common thread information - must be first!
96
97 uint32_t max_workers; //!< Maximum number of workers which will connect to this coordinator.
98 fr_coord_reg_t *coord_reg; //!< Coordinator registration details.
99 fr_coord_t *coord; //!< The coordinator data structure.
100 fr_sem_t *sem; //!< For inter-thread signaling.
102
103/** Control plane message used for workers attaching / detaching to coordinators
104 */
105typedef struct {
106 uint32_t worker; //!< Worker ID
107 fr_control_t *worker_recv_control; //!< Control plane to send messages to this worker
108 fr_atomic_queue_t *worker_recv_aq; //!< Atomic queue to send data to this worker
110
111typedef struct {
112 uint32_t worker; //!< Worker ID
113 bool exiting; //!< Is the server exiting
115
116/** Compare coordinators by registration
117 */
118static int8_t coord_cmp(void const *one, void const *two)
119{
120 fr_coord_t const *a = one, *b = two;
121
122 return CMP(a->coord_reg, b->coord_reg);
123}
124
125/** Register a coordinator
126 *
127 * To be called from mod_instantiate of a module which uses a coordinator
128 *
129 * @param ctx to allocate registration under
130 * @param reg_ctx Registration data
131 * @return
132 * - coordination registration on success
133 * - NULL on failure
134 */
136{
137 fr_coord_reg_t *coord_reg;
138
139 /* Allocate the list of registered coordinators if not already done */
140 if (!coord_regs) {
141 MEM(coord_regs = talloc_zero(NULL, fr_dlist_head_t));
143 }
144
145 MEM(coord_reg = talloc(ctx, fr_coord_reg_t));
146 *coord_reg = (fr_coord_reg_t) {
147 .name = reg_ctx->name,
148 .coord_cb = reg_ctx->coord_cb,
149 .worker_cb = reg_ctx->worker_cb,
150 .worker_send_size = reg_ctx->worker_send_size ? reg_ctx->worker_send_size : 4096,
151 .coord_send_size = reg_ctx->coord_send_size ? reg_ctx->coord_send_size : 4096,
152 };
153
155
156 return coord_reg;
157}
158
159/** De-register a coordinator
160 *
161 * To be called from mod_detach of a module which uses a coordinator
162 *
163 * When running in threaded mode, will wait for the coordinator to exit.
164 *
165 * @param coord_reg to de-register
166 */
168{
169 int ret;
170
171 fr_dlist_remove(coord_regs, coord_reg);
172
173 /*
174 * In single threaded mode just free the registration.
175 */
176 if (!coord_threads) goto free;
177
179 if (sc->coord_reg == coord_reg) {
180 if ((ret = pthread_join(sc->thread.pthread_id, NULL)) != 0) {
181 ERROR("Failed joining coordinator %s: %s", coord_reg->name, fr_syserror(ret));
182 } else {
183 DEBUG2("Coordinator %s joined (cleaned up)", coord_reg->name);
184 }
185
188 break;
189 }
190 }
191
192free:
193 talloc_free(coord_reg);
194
195 if (fr_dlist_num_elements(coord_regs) == 0) TALLOC_FREE(coord_regs);
196}
197
198/** Callback for a coordinator receiving data from a worker
199 */
200static void coord_data_recv(void *ctx, void const *data, size_t data_size, fr_time_t now)
201{
202 fr_coord_t *coord = talloc_get_type_abort(ctx, fr_coord_t);
204 fr_coord_data_t *cd;
205 fr_dbuff_t dbuff;
206
207 fr_assert(data_size == sizeof(cm));
208 memcpy(&cm, data, data_size);
209
210 if (unlikely(!fr_atomic_queue_pop(coord->coord_recv_aq, (void **)&cd))) return;
211
212 DEBUG3("Coordinator %s got data from worker %d for callback %d",
213 coord->coord_reg->name, cm.worker, cd->coord_cb_id);
214
215 if (cd->coord_cb_id >= coord->num_callbacks) {
216 ERROR("Received data for callback %d which is not defined", cd->coord_cb_id);
217 fr_message_done(&cd->m);
218 return;
219 }
220
221 fr_dbuff_init(&dbuff, (uint8_t const *)cd->m.data, cd->m.data_size);
222 coord->callbacks[cd->coord_cb_id].callback(coord, cm.worker, &dbuff, now,
223 coord->cb_inst[cd->coord_cb_id] ?
224 coord->cb_inst[cd->coord_cb_id]->inst_data : NULL,
225 coord->callbacks[cd->coord_cb_id].uctx);
226 fr_message_done(&cd->m);
227}
228
229/** Callback for a worker receiving data from a coordinator
230 */
231static void coord_worker_data_recv(void *ctx, void const *data, size_t data_size, fr_time_t now)
232{
233 fr_coord_worker_t *cw = talloc_get_type_abort(ctx, fr_coord_worker_t);
235 fr_coord_data_t *cd;
236 fr_dbuff_t dbuff;
237
238 fr_assert(data_size == sizeof(cm));
239 memcpy(&cm, data, data_size);
240
241 if (unlikely(!fr_atomic_queue_pop(cw->worker_recv_aq, (void **)&cd))) return;
242
243 DEBUG3("Coordinator %s sent message for callback %d", cw->coord->coord_reg->name, cd->coord_cb_id);
244
245 if (cd->coord_cb_id >= cw->num_callbacks) {
246 ERROR("Received message for callback %d which is not defined", cd->coord_cb_id);
247 fr_message_done(&cd->m);
248 return;
249 }
250
251 fr_dbuff_init(&dbuff, (uint8_t const *)cd->m.data, cd->m.data_size);
252 cw->callbacks[cd->coord_cb_id].callback(cw, &dbuff, now, cw->callbacks[cd->coord_cb_id].uctx);
253 fr_message_done(&cd->m);
254}
255
256/** Callback run by a coordinator when a worker attaches
257 */
258static void coord_worker_attach(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
259{
260 fr_coord_t *coord = talloc_get_type_abort(ctx, fr_coord_t);
262 fr_coord_msg_t ack;
263
264 fr_assert(data_size == sizeof(fr_coord_worker_attach_msg_t));
265 fr_assert(msg->worker < coord->max_workers);
266
267 DEBUG2("Worker %d attached to %s", msg->worker, coord->coord_reg->name);
268 coord->num_workers++;
269 coord->coord_send_control[msg->worker] = msg->worker_recv_control;
270 coord->coord_send_aq[msg->worker] = msg->worker_recv_aq;
271
272 ack.worker = msg->worker;
273 fr_control_message_send(coord->coord_send_control[msg->worker], coord->coord_send_rb[msg->worker],
274 FR_CONTROL_ID_COORD_WORKER_ACK, &ack, sizeof(ack));
275}
276
277/** Callback run by a coordinator when a worker detaches
278 */
279static void coord_worker_detach(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
280{
281 fr_coord_t *coord = talloc_get_type_abort(ctx, fr_coord_t);
283 fr_coord_msg_t ack;
284
285 fr_assert(data_size == sizeof(fr_coord_worker_detach_msg_t));
286 fr_assert(msg->worker < coord->max_workers);
287
288 DEBUG2("Worker %d detached from %s", msg->worker, coord->coord_reg->name);
289 coord->num_workers--;
290
291 ack.worker = msg->worker;
292 fr_control_message_send(coord->coord_send_control[msg->worker], coord->coord_send_rb[msg->worker],
294
295 coord->coord_send_control[msg->worker] = NULL;
296 coord->coord_send_aq[msg->worker] = NULL;
297 if (msg->exiting) coord->exiting = true;
298}
299
300/** Create a coordinator from its registration
301 *
302 * @param ctx to allocate the coordinator in
303 * @param el Event list to run this coordinator
304 * @param coord_reg Registration to configure this coordinator
305 * @param single_thread Is the server in single thread mode
306 * @param max_workers The maximum number of workers which will attach
307 * @return
308 * - the coordinator on success
309 * - NULL on failure
310 */
311static fr_coord_t *fr_coord_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coord_reg_t *coord_reg,
312 bool single_thread, uint32_t max_workers)
313{
314 fr_coord_t *coord;
315 uint32_t i;
316 fr_coord_cb_reg_t *cb = coord_reg->coord_cb;
318
319 MEM(coord = talloc(ctx, fr_coord_t));
320 *coord = (fr_coord_t) {
321 .el = el,
322 .coord_reg = coord_reg,
323 .single_thread = single_thread,
324 .max_workers = max_workers
325 };
326
327 /* Allocate atomic queue / control for receiving messages from workers */
329 if (!aq) {
330 fr_strerror_const("Failed creating worker -> coordinator atomic queue");
331 fail:
332 talloc_free(coord);
333 return NULL;
334 }
335 coord->coord_recv_control = fr_control_create(coord, el, aq, 5);
336 if (!coord->coord_recv_control) {
337 fr_strerror_const("Failed creating worker -> coordinator control plane");
338 goto fail;
339 }
340
341 /* Allocate atomic queue for workers sending data to coordinators */
343 if (!coord->coord_recv_aq) {
344 fr_strerror_const("Failed creating worker -> coordinator data atomic queue");
345 goto fail;
346 }
347
349 coord, coord_worker_attach) < 0) goto fail;
351 coord, coord_worker_detach) < 0) goto fail;
353 coord, coord_data_recv) < 0) goto fail;
354
355 /* Count the number of callbacks defined, for sanity checking messages */
356 while (cb->callback) {
357 coord->num_callbacks++;
358 cb++;
359 }
360 coord->callbacks = coord_reg->coord_cb;
361
362 if (fr_control_open(coord->coord_recv_control) < 0) {
363 fr_strerror_const("Failed opening control plane");
364 goto fail;
365 }
366
367 MEM(coord->coord_send_rb = talloc_array(coord, fr_ring_buffer_t *, coord->max_workers));
368 MEM(coord->coord_send_ms = talloc_array(coord, fr_message_set_t *, coord->max_workers));
369 for (i = 0; i < coord->max_workers; i++) {
371 if (!coord->coord_send_rb[i]) goto fail;
372
374 coord_reg->coord_send_size, true);
375 if (!coord->coord_send_ms[i]) goto fail;
376 }
377 MEM(coord->coord_send_control = talloc_zero_array(coord, fr_control_t *, coord->max_workers));
378 MEM(coord->coord_send_aq = talloc_zero_array(coord, fr_atomic_queue_t *, coord->max_workers));
379
380 MEM(coord->cb_inst = talloc_zero_array(coord, fr_coord_cb_inst_t *, coord->num_callbacks));
381
382 for (i = 0; i < coord->num_callbacks; i++) {
383 if (!coord->callbacks[i].inst_create) continue;
384 coord->cb_inst[i] = coord->callbacks[i].inst_create(coord, coord, coord->el, coord->single_thread,
385 coord->callbacks[i].uctx);
386 if (!coord->cb_inst[i]) goto fail;
387 }
388
389 return coord;
390}
391
392/** Run the event loop for a coordinator thread when in multi-threaded mode
393 */
394static void fr_coordinate(fr_coord_t *coord)
395{
396 uint32_t i;
397 fr_coord_cb_inst_t *cb_inst;
398
399 /*
400 * Run until we're told to exit AND the number of
401 * workers has dropped to zero.
402 *
403 * Whenever a worker detaches, coord->num_workers
404 * is decremented, so when coord->num_workers == 0,
405 * all workers have detached and are no longer using
406 * the channel.
407 */
408 while (likely(!(coord->exiting && (coord->num_workers == 0)))) {
409 int num_events;
410
411 /*
412 * Check the event list. If there's an error
413 * (e.g. exit), we stop looping and clean up.
414 */
415 DEBUG4("Gathering events");
416 num_events = fr_event_corral(coord->el, fr_time(), true);
417 DEBUG4("%u event(s) pending%s",
418 num_events == -1 ? 0 : num_events, num_events == -1 ? " - event loop exiting" : "");
419 if (num_events < 0) break;
420
421 /*
422 * Service outstanding events.
423 */
424 if (num_events > 0) {
425 DEBUG4("Servicing event(s)");
426 fr_event_service(coord->el);
427 }
428
429 /*
430 * Run any registered instance specific event callbacks
431 */
432 for (i = 0; i < coord->num_callbacks; i++) {
433 cb_inst = coord->cb_inst[i];
434 if (cb_inst && cb_inst->event_cb) cb_inst->event_cb(coord->el, cb_inst->inst_data);
435 }
436 }
437
438 return;
439}
440
441/** Entry point for a coordinator thread
442 */
443static void *fr_coordinate_thread(void *arg)
444{
445 fr_schedule_coord_t *sc = talloc_get_type_abort(arg, fr_schedule_coord_t);
446 fr_coord_reg_t *coord_reg = sc->coord_reg;
448 char coordinate_name[64];
449
450 snprintf(coordinate_name, sizeof(coordinate_name), "Coordinate %s", coord_reg->name);
451
452 if (fr_thread_setup(&sc->thread, coordinate_name) < 0) goto fail;
453
454 sc->coord = fr_coord_create(sc->thread.ctx, sc->thread.el, coord_reg, false, sc->max_workers);
455 if (!sc->coord) {
456 PERROR("%s - Failed creating coordinator thread", coordinate_name);
457 goto fail;
458 }
459
460 /*
461 * Create all the thread specific data for the coordinator thread
462 */
463 if (fr_thread_instantiate(sc->thread.ctx, sc->thread.el) < 0) goto fail;
464
465 /*
466 * Tell the originator that the thread has started.
467 */
468 fr_thread_start(&sc->thread, sc->sem);
469
470 fr_coordinate(sc->coord);
471
472 status = FR_THREAD_EXITED;
473
474fail:
476
477 fr_thread_exit(&sc->thread, status, sc->sem);
478
479 return NULL;
480}
481
482/** Start all registered coordinator threads in multi-threaded mode
483 *
484 * @param num_workers The number of workers which will be attaching
485 * @param sem Semaphore to use signalling the threads are ready
486 * @return
487 * - 0 on success
488 * - -1 on failure
489 */
491{
492 int num = 0;
493
494 if (!coord_regs) return 0;
495
496 MEM(coord_threads = talloc(NULL, fr_dlist_head_t));
499
502
503 MEM(sc = talloc_zero(coord_threads, fr_schedule_coord_t));
504
505 sc->thread.id = num++;
506 sc->coord_reg = coord_reg;
507 sc->max_workers = num_workers;
508 sc->sem = sem;
509
510 if (fr_thread_create(&sc->thread.pthread_id, fr_coordinate_thread, sc) < 0) {
512 PERROR("Failed creating coordinator %s", coord_reg->name);
513 return -1;
514 };
515
517 }
518
519 /*
520 * Wait for all the coordinators to start.
521 */
522 if (fr_thread_wait_list(sem, coord_threads) < 0) {
523 ERROR("Failed creating coordinator threads");
524 return -1;
525 }
526
527 /*
528 * Insert the coordinators in the tree
529 */
531 fr_assert(sc->coord);
532 fr_rb_insert(&coords, sc->coord);
533 }
534
535 return 0;
536}
537
538/** Clean up coordinators in single threaded mode
539 */
541{
542 fr_coord_t *coord;
544
545 if (fr_rb_num_elements(&coords) == 0) return;
546
547 while ((coord = fr_rb_iter_init_inorder(&coords, &iter))) {
549 talloc_free(coord);
550 }
551
552 TALLOC_FREE(coord_regs);
553 TALLOC_FREE(coord_threads);
554}
555
556/** Start coordinators in single threaded mode
557 */
558int fr_coords_create(TALLOC_CTX *ctx, fr_event_list_t *el)
559{
560 if (!coord_regs) return 0;
561
563
565 char coordinate_name[64];
566 fr_coord_t *coord;
567
568 snprintf(coordinate_name, sizeof(coordinate_name), "Coordinator %s", coord_reg->name);
569
570 INFO("%s - Starting", coordinate_name);
571
572 coord = fr_coord_create(ctx, el, coord_reg, true, 1);
573 if (!coord) {
574 PERROR("%s - Failed creating coordinator thread", coordinate_name);
575 return -1;
576 }
577
578 fr_rb_insert(&coords, coord);
579 }
580
581 return 0;
582}
583
584/** Signal a coordinator that a worker wants to detach
585 *
586 * @param cw Worker which is detaching.
587 * @param exiting Is the server exiting.
588 */
589int fr_coord_detach(fr_coord_worker_t *cw, bool exiting)
590{
592
593 msg = talloc(cw, fr_coord_worker_detach_msg_t);
594 msg->worker = fr_schedule_worker_id();
595 msg->exiting = exiting;
596
599 msg, sizeof(fr_coord_worker_detach_msg_t)) < 0) return -1;
600
602
603 return 0;
604}
605
606/** A worker got an ack from a coordinator in response to attach / detach
607 */
608static void coordinate_worker_ack(UNUSED void *ctx, NDEBUG_UNUSED void const *data, NDEBUG_UNUSED size_t data_size,
609 UNUSED fr_time_t now)
610{
611#ifndef NDEBUG
612 fr_coord_msg_t const *cm = data;
613
614 fr_assert(data_size == sizeof(fr_coord_msg_t));
616#endif
617}
618
619/** Attach a worker to a coordinator
620 *
621 * @param ctx To allocate worker structure in
622 * @param el Event list for control messages
623 * @param coord_reg Coordinator registration to attach to.
624 * @return
625 * - Worker structure for coordinator use on success
626 * - NULL on failure
627 */
629{
631 fr_coord_worker_cb_reg_t *cb_reg = coord_reg->worker_cb;
633 fr_coord_t find;
635
636 cw = talloc_zero(ctx, fr_coord_worker_t);
637
638 find = (fr_coord_t) {
639 .coord_reg = coord_reg
640 };
641 cw->coord = fr_rb_find(&coords, &find);
642 if (!cw->coord) {
643 fail:
644 talloc_free(cw);
645 return NULL;
646 }
647
648 aq = fr_atomic_queue_alloc(cw, 1024);
653 coord_reg->worker_send_size, true);
654
655 while (cb_reg->callback) {
656 cw->num_callbacks++;
657 cb_reg++;
658 }
659 cw->callbacks = coord_reg->worker_cb;
660
662 cw, coordinate_worker_ack) < 0) goto fail;
664 cw, coord_worker_data_recv) < 0) goto fail;
665
666 if (fr_control_open(cw->worker_recv_control) < 0) goto fail;
667
668 msg.worker_recv_control = cw->worker_recv_control;
669 msg.worker_recv_aq = cw->worker_recv_aq;
670 msg.worker = fr_schedule_worker_id();
671
674 &msg, sizeof(fr_coord_worker_attach_msg_t)) < 0) goto fail;
675
677
678 return cw;
679}
680
681/** Send generic data from a coordinator to a worker
682 *
683 * @param coord Coordinator which is sending the data.
684 * @param worker_id Worker to send data to.
685 * @param cb_id Callback ID for the worker to run.
686 * @param dbuff Buffer containing data to send.
687 * @return
688 * - 0 on success
689 * - <0 on failure
690 */
692{
694 fr_coord_data_t *cd = NULL;
695
696 cm = (fr_coord_msg_t) {
698 };
699
701 fr_dbuff_used(dbuff));
702 if (!cd) return -1;
703
704 memcpy(cd->m.data, fr_dbuff_buff(dbuff), fr_dbuff_used(dbuff));
705 cd->coord_cb_id = cb_id;
706 if (!fr_atomic_queue_push(coord->coord_send_aq[worker_id], cd)) {
708 return -1;
709 }
712 &cm, sizeof(fr_coord_msg_t));
713}
714
715/** Broadcast data from a coordinator to all workers
716 *
717 * @param coord Coordinator which is sending the data.
718 * @param cb_id Callback ID for the workers to run.
719 * @param dbuff Buffer containing data to send.
720 * @return
721 * - 0 on success
722 * - <0 on failure - indicating the number of sends which failed.
723 */
725{
726 uint32_t i;
727 int failed = 0;
728
729 for (i = 0; i < coord->max_workers; i++) {
730 if (!coord->coord_send_control[i]) continue;
731 if (fr_coord_to_worker_send(coord, i, cb_id, dbuff) < 0) failed++;
732 }
733
734 return 0 - failed;
735}
736
737/** Send data from a worker to a coordinator
738 *
739 * @param cw Worker side of coordinator sending the data.
740 * @param cb_id Callback ID for the coordinator to run.
741 * @param dbuff Buffer containing data to send.
742 * @return
743 * - 0 on success
744 * - < 0 on failure
745 */
747{
749 fr_coord_data_t *cd = NULL;
750
751 cm = (fr_coord_msg_t) {
753 };
754
756 if (!cd) return -1;
757
758 memcpy(cd->m.data, fr_dbuff_buff(dbuff), fr_dbuff_used(dbuff));
759 cd->coord_cb_id = cb_id;
762 return -1;
763 }
764
767}
768
769/** Insert instance specific pre-event callbacks
770 */
772{
773 fr_coord_t *coord;
775 fr_coord_cb_inst_t *cb_inst;
776 uint32_t i;
777
778 if (!coord_regs) return 0;
779
780 for (coord = fr_rb_iter_init_inorder(&coords, &iter);
781 coord != NULL;
782 coord = fr_rb_iter_next_inorder(&coords, &iter)) {
783 for (i = 0; i < coord->num_callbacks; i++) {
784 cb_inst = coord->cb_inst[i];
785 if (cb_inst && cb_inst->event_pre_cb &&
786 fr_event_pre_insert(el, cb_inst->event_pre_cb, cb_inst->inst_data) < 0) {
787 return -1;
788 }
789 }
790 }
791 return 0;
792}
793
794/** Insert instance specific post-event callbacks
795 */
797{
798 fr_coord_t *coord;
800 fr_coord_cb_inst_t *cb_inst;
801 uint32_t i;
802
803 if (!coord_regs) return 0;
804
805 for (coord = fr_rb_iter_init_inorder(&coords, &iter);
806 coord != NULL;
807 coord = fr_rb_iter_next_inorder(&coords, &iter)) {
808 for (i = 0; i < coord->num_callbacks; i++) {
809 cb_inst = coord->cb_inst[i];
810 if (cb_inst && cb_inst->event_post_cb &&
811 fr_event_post_insert(el, cb_inst->event_post_cb, cb_inst->inst_data) < 0) {
812 return -1;
813 }
814 }
815 }
816 return 0;
817}
log_entry msg
Definition acutest.h:794
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:488
#define NDEBUG_UNUSED
Definition build.h:329
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:113
#define unlikely(_x)
Definition build.h:384
#define UNUSED
Definition build.h:318
#define FR_CONTROL_MAX_SIZE
Definition control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition control.h:50
static size_t num_workers
static fr_atomic_queue_t ** aq
fr_dlist_t entry
Entry in list of registrations.
Definition coord.c:85
static void coord_worker_attach(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
Callback run by a coordinator when a worker attaches.
Definition coord.c:258
fr_atomic_queue_t * worker_recv_aq
Atomic queue for coordinator -> worker messages.
Definition coord.c:76
uint32_t max_workers
Maximum number of workers which will connect to this coordinator.
Definition coord.c:97
fr_control_t ** coord_send_control
Control planes for coordinator -> worker messages.
Definition coord.c:61
#define FR_CONTROL_ID_COORD_WORKER_DETACH
Message sent from worker to detach from a coordinator.
Definition coord.c:37
static void coordinate_worker_ack(UNUSED void *ctx, NDEBUG_UNUSED void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
A worker got an ack from a coordinator in response to attach / detach.
Definition coord.c:608
fr_coord_reg_t * coord_reg
Coordinator registration details.
Definition coord.c:98
fr_coord_worker_cb_reg_t * worker_cb
Callbacks for coordinator -> worker messages.
Definition coord.c:87
int fr_coords_create(TALLOC_CTX *ctx, fr_event_list_t *el)
Start coordinators in single threaded mode.
Definition coord.c:558
fr_coord_reg_t * fr_coord_register(TALLOC_CTX *ctx, fr_coord_reg_ctx_t *reg_ctx)
Register a coordinator.
Definition coord.c:135
fr_control_t * worker_recv_control
Control plane to send messages to this worker.
Definition coord.c:107
static fr_dlist_head_t * coord_threads
Definition coord.c:42
static fr_coord_t * fr_coord_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coord_reg_t *coord_reg, bool single_thread, uint32_t max_workers)
Create a coordinator from its registration.
Definition coord.c:311
fr_coord_worker_t * fr_coord_attach(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coord_reg_t *coord_reg)
Attach a worker to a coordinator.
Definition coord.c:628
fr_coord_reg_t * coord_reg
Coordinator registration details.
Definition coord.c:48
size_t worker_send_size
Initial size for worker -> coordinator ring buffer.
Definition coord.c:88
size_t coord_send_size
Initial size for coordinator -> worker ring buffer.
Definition coord.c:89
uint32_t worker
Worker ID.
Definition coord.c:112
int fr_coord_to_worker_send(fr_coord_t *coord, uint32_t worker_id, uint32_t cb_id, fr_dbuff_t *dbuff)
Send generic data from a coordinator to a worker.
Definition coord.c:691
fr_ring_buffer_t ** coord_send_rb
Ring buffers for coordinator -> worker control messages.
Definition coord.c:60
uint32_t max_workers
Maximum number of workers we expect.
Definition coord.c:55
bool exiting
Is the server exiting.
Definition coord.c:113
fr_atomic_queue_t * worker_recv_aq
Atomic queue to send data to this worker.
Definition coord.c:108
static void coord_worker_data_recv(void *ctx, void const *data, size_t data_size, fr_time_t now)
Callback for a worker receiving data from a coordinator.
Definition coord.c:231
static void coord_worker_detach(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
Callback run by a coordinator when a worker detaches.
Definition coord.c:279
fr_coord_t * coord
Coordinator this worker is related to.
Definition coord.c:72
fr_coord_cb_inst_t ** cb_inst
Array of callback instance specific data.
Definition coord.c:53
void fr_coord_deregister(fr_coord_reg_t *coord_reg)
De-register a coordinator.
Definition coord.c:167
void fr_coords_destroy(void)
Clean up coordinators in single threaded mode.
Definition coord.c:540
static int8_t coord_cmp(void const *one, void const *two)
Compare coordinators by registration.
Definition coord.c:118
fr_control_t * coord_recv_control
Control plane for worker -> coordinator messages.
Definition coord.c:58
#define FR_CONTROL_ID_COORD_WORKER_ATTACH
Message sent from worker to attach to a coordinator.
Definition coord.c:36
int fr_worker_to_coord_send(fr_coord_worker_t *cw, uint32_t cb_id, fr_dbuff_t *dbuff)
Send data from a worker to a coordinator.
Definition coord.c:746
static fr_dlist_head_t * coord_regs
Definition coord.c:41
fr_coord_cb_reg_t * callbacks
Array of callbacks for worker -> coordinator messages.
Definition coord.c:51
int fr_coord_post_event_insert(fr_event_list_t *el)
Insert instance specific post-event callbacks.
Definition coord.c:796
int fr_coord_pre_event_insert(fr_event_list_t *el)
Insert instance specific pre-event callbacks.
Definition coord.c:771
uint32_t num_callbacks
Number of callbacks registered.
Definition coord.c:78
fr_coord_cb_reg_t * coord_cb
Callbacks for worker -> coordinator messages.
Definition coord.c:86
fr_atomic_queue_t * coord_recv_aq
Atomic queue for worker -> coordinator.
Definition coord.c:59
static void * fr_coordinate_thread(void *arg)
Entry point for a coordinator thread.
Definition coord.c:443
static void coord_data_recv(void *ctx, void const *data, size_t data_size, fr_time_t now)
Callback for a coordinator receiving data from a worker.
Definition coord.c:200
fr_atomic_queue_t ** coord_send_aq
Atomic queues for coordinator -> worker data.
Definition coord.c:63
uint32_t num_callbacks
Number of callbacks defined.
Definition coord.c:52
bool exiting
Is this coordinator shutting down.
Definition coord.c:65
uint32_t worker
Worker ID.
Definition coord.c:106
fr_rb_node_t node
Entry in the tree of coordinators.
Definition coord.c:50
fr_coord_worker_cb_reg_t * callbacks
Callbacks for coordinator -> worker messages.
Definition coord.c:77
fr_event_list_t * el
Coordinator event list.
Definition coord.c:49
int fr_coord_start(uint32_t num_workers, fr_sem_t *sem)
Start all registered coordinator threads in multi-threaded mode.
Definition coord.c:490
#define FR_CONTROL_ID_COORD_WORKER_ACK
Message sent to worker to acknowledge attach / detach.
Definition coord.c:38
#define FR_CONTROL_ID_COORD_DATA
Worker <-> coordinator message to pass data to a callback.
Definition coord.c:39
fr_message_set_t ** coord_send_ms
Message sets for coordinator -> worker data.
Definition coord.c:62
fr_thread_t thread
common thread information - must be first!
Definition coord.c:95
static void fr_coordinate(fr_coord_t *coord)
Run the event loop for a coordinator thread when in multi-threaded mode.
Definition coord.c:394
fr_control_t * worker_recv_control
Coordinator -> worker control plane.
Definition coord.c:75
bool single_thread
Are we in single thread mode.
Definition coord.c:66
char const * name
Name for debugging.
Definition coord.c:84
int fr_coord_to_worker_broadcast(fr_coord_t *coord, uint32_t cb_id, fr_dbuff_t *dbuff)
Broadcast data from a coordinator to all workers.
Definition coord.c:724
int fr_coord_detach(fr_coord_worker_t *cw, bool exiting)
Signal a coordinator that a worker wants to detach.
Definition coord.c:589
fr_ring_buffer_t * worker_send_rb
Ring buffer for worker -> coordinator control plane.
Definition coord.c:73
fr_coord_t * coord
The coordinator data structure.
Definition coord.c:99
static fr_rb_tree_t coords
Definition coord.c:43
uint32_t num_workers
How many workers are attached.
Definition coord.c:56
fr_message_set_t * worker_send_ms
Message set for worker -> coordinator messages.
Definition coord.c:74
fr_sem_t * sem
For inter-thread signaling.
Definition coord.c:100
A coordinator registration.
Definition coord.c:83
A coordinator which receives messages from workers.
Definition coord.c:47
Control plane message used for workers attaching / detaching to coordinators.
Definition coord.c:105
The worker end of worker <-> coordinator communication.
Definition coord.c:71
Scheduler specific information for coordinator threads.
Definition coord.c:94
fr_coord_cb_t callback
Definition coord.h:46
fr_coord_cb_reg_t * coord_cb
Callbacks for worker -> coordinator messages.
Definition coord.h:61
struct fr_coord_s fr_coord_t
Definition coord.h:36
void * uctx
Definition coord.h:48
char const * name
Name for this coordinator.
Definition coord.h:60
fr_coord_worker_cb_t callback
Definition coord.h:53
size_t coord_send_size
Initial ring buffer size for coordinator -> worker data.
Definition coord.h:65
struct fr_coord_reg_s fr_coord_reg_t
Definition coord.h:34
fr_coord_worker_cb_reg_t * worker_cb
Callbacks for coordinator -> worker messages.
Definition coord.h:62
size_t worker_send_size
Initial ring buffer size for worker -> coordinator data.
Definition coord.h:63
fr_coord_cb_inst_create_t inst_create
Definition coord.h:47
fr_coord_inst_event_cb_t event_cb
Event callback in multi thread mode.
Definition coord_priv.h:48
void * inst_data
Instance data.
Definition coord_priv.h:45
uint32_t coord_cb_id
Callback ID for this message.
Definition coord_priv.h:40
uint32_t worker
Worker ID.
Definition coord_priv.h:33
fr_event_status_cb_t event_pre_cb
Pre-event callback in single thread mode.
Definition coord_priv.h:46
fr_event_post_cb_t event_post_cb
Post-event callback in single thread mode.
Definition coord_priv.h:47
fr_message_t m
Message containing data being sent.
Definition coord_priv.h:39
List / data message used between workers and coordinators.
Definition coord_priv.h:38
Generic control message used between workers and coordinators.
Definition coord_priv.h:32
#define fr_dbuff_used(_dbuff_or_marker)
Return the number of bytes remaining between the start of the dbuff or marker and the current positio...
Definition dbuff.h:775
#define fr_dbuff_init(_out, _start, _len_or_end)
Initialise an dbuff for encoding or decoding.
Definition dbuff.h:362
#define fr_dbuff_buff(_dbuff_or_marker)
Return the underlying buffer in a dbuff or one of marker.
Definition dbuff.h:890
#define MEM(x)
Definition debug.h:46
#define ERROR(fmt,...)
Definition dhcpclient.c:40
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:242
#define fr_dlist_foreach(_list_head, _type, _iter)
Iterate over the contents of a list.
Definition dlist.h:98
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition dlist.h:620
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:921
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:360
Head of a doubly linked list.
Definition dlist.h:51
Entry in a doubly linked list.
Definition dlist.h:41
free(array)
talloc_free(hp)
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq, size_t num_callbacks)
Create a control-plane signaling path.
Definition control.c:152
int fr_control_callback_add(fr_control_t **c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition control.c:443
int fr_control_open(fr_control_t *c)
Open the control-plane signalling path.
Definition control.c:176
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:355
void fr_control_wait(fr_control_t *c)
Wait for a plane control to become readable.
Definition control.c:542
The control structure.
Definition control.c:76
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define DEBUG4(_fmt,...)
Definition log.h:267
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition event.c:2177
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition event.c:2045
#define fr_time()
Definition event.c:60
int fr_event_pre_insert(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Add a pre-event callback to the event list.
Definition event.c:1953
int fr_event_post_insert(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Add a post-event callback to the event list.
Definition event.c:2000
Stores all information relating to an event list.
Definition event.c:377
unsigned int uint32_t
unsigned char uint8_t
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
Create a message set.
Definition message.c:127
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:195
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:1000
A Message set, composed of message headers and ring buffer data.
Definition message.c:94
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
#define fr_assert(_expr)
Definition rad_assert.h:37
#define DEBUG2(fmt,...)
#define INFO(fmt,...)
Definition radict.c:63
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
Return how many nodes there are in a tree.
Definition rb.c:781
void * fr_rb_iter_init_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Initialise an in-order iterator.
Definition rb.c:824
void fr_rb_iter_delete_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Remove the current node from the tree.
Definition rb.c:899
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
void * fr_rb_iter_next_inorder(UNUSED fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Return the next node.
Definition rb.c:850
struct fr_rb_tree_s fr_rb_tree_t
Definition rb.h:51
#define fr_rb_inline_talloc_init(_tree, _type, _field, _data_cmp, _data_free)
Initialises a red black that verifies elements are of a specific talloc type.
Definition rb.h:155
uint32_t num_elements
How many elements are inside the tree.
Definition rb.h:95
Iterator structure for in-order traversal of an rbtree.
Definition rb.h:319
The main red black tree structure.
Definition rb.h:71
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:63
static _Thread_local int worker_id
Internal ID of the current worker thread.
Definition schedule.c:103
int fr_schedule_worker_id(void)
Return the worker id for the current thread.
Definition schedule.c:109
sem_t fr_sem_t
Definition semaphore.h:53
static const uchar sc[16]
Definition smbdes.c:115
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
Definition snprintf.c:689
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
void fr_thread_start(fr_thread_t *thread, fr_sem_t *sem)
Signal the parent that we're done.
Definition thread.c:203
int fr_thread_wait_list(fr_sem_t *sem, fr_dlist_head_t *head)
Wait for multiple threads to signal readiness via a semaphore.
Definition thread.c:79
int fr_thread_create(pthread_t *thread, fr_thread_entry_t func, void *arg)
Create a joinable thread.
Definition thread.c:46
int fr_thread_setup(fr_thread_t *out, char const *name)
Common setup for child threads: block signals, allocate a talloc context, and create an event list.
Definition thread.c:112
int fr_thread_instantiate(TALLOC_CTX *ctx, fr_event_list_t *el)
Instantiate thread-specific data for modules, virtual servers, xlats, unlang, and TLS.
Definition thread.c:165
void fr_thread_detach(void)
Detach thread-specific data for modules, virtual servers, xlats.
Definition thread.c:190
void fr_thread_exit(fr_thread_t *thread, fr_thread_status_t status, fr_sem_t *sem)
Signal the parent that we're done.
Definition thread.c:218
fr_thread_status_t
Track the child thread status.
Definition thread.h:38
@ FR_THREAD_EXITED
exited, and in the exited queue
Definition thread.h:42
@ FR_THREAD_FAIL
failed, and in the exited queue
Definition thread.h:43
"server local" time.
Definition time.h:69
static fr_event_list_t * el
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1340