The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
coord.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: fdbd6a4f007de249e202c3bb878d5bdadfc04aba $
19 *
20 * @brief Coordination thread management
21 * @file io/coord.c
22 *
23 * @copyright 2026 Network RADIUS SAS (legal@networkradius.com)
24 */
25RCSID("$Id: fdbd6a4f007de249e202c3bb878d5bdadfc04aba $")
26
27#include <freeradius-devel/io/listen.h>
28#include <freeradius-devel/io/schedule.h>
29#include <freeradius-devel/io/thread.h>
30#include <freeradius-devel/io/coord_priv.h>
31#include <freeradius-devel/unlang/base.h>
32#include <freeradius-devel/util/syserror.h>
33
34#include <stdalign.h>
35
36#define FR_CONTROL_ID_COORD_WORKER_ATTACH (1) //!< Message sent from worker to attach to a coordinator
37#define FR_CONTROL_ID_COORD_WORKER_DETACH (2) //!< Message sent from worker to detach from a coordinator
38#define FR_CONTROL_ID_COORD_WORKER_ACK (3) //!< Message sent to worker to acknowledge attach / detach
39#define FR_CONTROL_ID_COORD_DATA (4) //!< Worker <-> coordinator message to pass data to a callback
40
41#define MIN_WORKER_ID -1 //!< The minimum value we expect as worker id. -1 is the main thread.
42
46
47/** A coordinator which receives messages from workers
48 */
49struct fr_coord_s {
50 fr_coord_reg_t *coord_reg; //!< Coordinator registration details.
51 fr_event_list_t *el; //!< Coordinator event list.
52 fr_rb_node_t node; //!< Entry in the tree of coordinators.
53 fr_coord_cb_reg_t *callbacks; //!< Array of callbacks for worker -> coordinator messages.
54 uint32_t num_callbacks; //!< Number of callbacks defined.
55 fr_coord_cb_inst_t **cb_inst; //!< Array of callback instance specific data.
56
57 uint32_t max_workers; //!< Maximum number of workers we expect.
58 uint32_t num_workers; //!< How many workers are attached.
59
60 fr_control_t *coord_recv_control; //!< Control plane for worker -> coordinator messages.
61 fr_atomic_queue_t *coord_recv_aq; //!< Atomic queue for worker -> coordinator
62 fr_ring_buffer_t **coord_send_rb; //!< Ring buffers for coordinator -> worker control messages.
63 fr_control_t **coord_send_control; //!< Control planes for coordinator -> worker messages.
64 fr_message_set_t **coord_send_ms; //!< Message sets for coordinator -> worker data.
65 fr_atomic_queue_t **coord_send_aq; //!< Atomic queues for coordinator -> worker data.
66
67 bool exiting; //!< Is this coordinator shutting down.
68 bool single_thread; //!< Are we in single thread mode.
69};
70
71/** The worker end of worker <-> coordinator communication.
72 */
74 fr_coord_t *coord; //!< Coordinator this worker is related to
75 fr_ring_buffer_t *worker_send_rb; //!< Ring buffer for worker -> coordinator control plane
76 fr_message_set_t *worker_send_ms; //!< Message set for worker -> coordinator messages
77 fr_control_t *worker_recv_control; //!< Coordinator -> worker control plane
78 fr_atomic_queue_t *worker_recv_aq; //!< Atomic queue for coordinator -> worker messages
79 fr_coord_worker_cb_reg_t *callbacks; //!< Callbacks for coordinator -> worker messages
80 uint32_t num_callbacks; //!< Number of callbacks registered.
81};
82
83/** A coordinator registration
84 */
86 char const *name; //!< Name for debugging.
87 fr_dlist_t entry; //!< Entry in list of registrations.
88 fr_coord_cb_reg_t *coord_cb; //!< Callbacks for worker -> coordinator messages.
89 fr_coord_worker_cb_reg_t *worker_cb; //!< Callbacks for coordinator -> worker messages.
90 size_t worker_send_size; //!< Initial size for worker -> coordinator ring buffer.
91 size_t coord_send_size; //!< Initial size for coordinator -> worker ring buffer.
92 module_instance_t const *mi; //!< Module instance which registered this coordinator.
93};
94
95/** Scheduler specific information for coordinator threads
96 */
97typedef struct {
98 fr_thread_t thread; //!< common thread information - must be first!
99
100 uint32_t max_workers; //!< Maximum number of workers which will connect to this coordinator.
101 fr_coord_reg_t *coord_reg; //!< Coordinator registration details.
102 fr_coord_t *coord; //!< The coordinator data structure.
103 fr_sem_t *sem; //!< For inter-thread signaling.
105
106/** Control plane message used for workers attaching / detaching to coordinators
107 */
108typedef struct {
109 int32_t worker; //!< Worker ID
110 fr_control_t *worker_recv_control; //!< Control plane to send messages to this worker
111 fr_atomic_queue_t *worker_recv_aq; //!< Atomic queue to send data to this worker
113
114typedef struct {
115 int32_t worker; //!< Worker ID
116 bool exiting; //!< Is the server exiting
118
119/** Compare coordinators by registration
120 */
121static int8_t coord_cmp(void const *one, void const *two)
122{
123 fr_coord_t const *a = one, *b = two;
124
125 return CMP(a->coord_reg, b->coord_reg);
126}
127
128/** Register a coordinator
129 *
130 * To be called from mod_instantiate of a module which uses a coordinator
131 *
132 * @param reg_ctx Registration data
133 * @return
134 * - coordination registration on success
135 * - NULL on failure
136 */
138{
139 fr_coord_reg_t *coord_reg;
140
141 fr_assert(reg_ctx->coord_cb);
142 fr_assert(reg_ctx->worker_cb);
143 fr_assert(reg_ctx->mi);
144
145 /* Allocate the list of registered coordinators if not already done */
146 if (!coord_regs) {
147 MEM(coord_regs = talloc_zero(NULL, fr_dlist_head_t));
149 }
150
151 MEM(coord_reg = talloc(coord_regs, fr_coord_reg_t));
152 *coord_reg = (fr_coord_reg_t) {
153 .name = reg_ctx->name,
154 .coord_cb = reg_ctx->coord_cb,
155 .worker_cb = reg_ctx->worker_cb,
156 .worker_send_size = reg_ctx->worker_send_size ? reg_ctx->worker_send_size : 4096,
157 .coord_send_size = reg_ctx->coord_send_size ? reg_ctx->coord_send_size : 4096,
158 .mi = reg_ctx->mi,
159 };
160
162
163 return coord_reg;
164}
165
166/** De-register a coordinator
167 *
168 * To be called from mod_detach of a module which uses a coordinator
169 *
170 * When running in threaded mode, will wait for the coordinator to exit.
171 *
172 * @param coord_reg to de-register
173 */
175{
176 int ret;
177
178 fr_dlist_remove(coord_regs, coord_reg);
179
180 /*
181 * In single threaded mode just free the registration.
182 */
183 if (!coord_threads) goto free;
184
186 if (sc->coord_reg == coord_reg) {
187 if ((ret = pthread_join(sc->thread.pthread_id, NULL)) != 0) {
188 ERROR("Failed joining coordinator %s: %s", coord_reg->name, fr_syserror(ret));
189 } else {
190 DEBUG2("Coordinator %s joined (cleaned up)", coord_reg->name);
191 }
192
195 break;
196 }
197 }
198
199free:
200 talloc_free(coord_reg);
201
202 if (fr_dlist_num_elements(coord_regs) == 0) TALLOC_FREE(coord_regs);
203}
204
205/** Callback for a coordinator receiving data from a worker
206 */
207static void coord_data_recv(void *ctx, void const *data, size_t data_size, fr_time_t now)
208{
209 fr_coord_t *coord = talloc_get_type_abort(ctx, fr_coord_t);
211 fr_coord_data_t *cd;
212 fr_dbuff_t dbuff;
213
214 fr_assert(data_size == sizeof(cm));
215 memcpy(&cm, data, data_size);
216 fr_assert((cm.worker >= MIN_WORKER_ID) && (cm.worker < (int32_t)coord->max_workers));
217
218 if (unlikely(!fr_atomic_queue_pop(coord->coord_recv_aq, (void **)&cd))) return;
219
220 DEBUG3("Coordinator %s got data from worker %d for callback %d",
221 coord->coord_reg->name, cm.worker, cd->coord_cb_id);
222
223 if (cd->coord_cb_id >= coord->num_callbacks) {
224 ERROR("Received data for callback %d which is not defined", cd->coord_cb_id);
225 fr_message_done(&cd->m);
226 return;
227 }
228
229 fr_dbuff_init(&dbuff, (uint8_t const *)cd->m.data, cd->m.data_size);
230 coord->callbacks[cd->coord_cb_id].callback(coord, cm.worker, &dbuff, now,
231 coord->cb_inst[cd->coord_cb_id] ?
232 coord->cb_inst[cd->coord_cb_id]->inst_data : NULL,
233 coord->callbacks[cd->coord_cb_id].uctx);
234 fr_message_done(&cd->m);
235}
236
237/** Callback for a worker receiving data from a coordinator
238 */
239static void coord_worker_data_recv(void *ctx, void const *data, size_t data_size, fr_time_t now)
240{
241 fr_coord_worker_t *cw = talloc_get_type_abort(ctx, fr_coord_worker_t);
243 fr_coord_data_t *cd;
244 fr_dbuff_t dbuff;
245
246 fr_assert(data_size == sizeof(cm));
247 memcpy(&cm, data, data_size);
248
249 if (unlikely(!fr_atomic_queue_pop(cw->worker_recv_aq, (void **)&cd))) return;
250
251 DEBUG3("Coordinator %s sent message for callback %d", cw->coord->coord_reg->name, cd->coord_cb_id);
252
253 if (cd->coord_cb_id >= cw->num_callbacks) {
254 ERROR("Received message for callback %d which is not defined", cd->coord_cb_id);
255 fr_message_done(&cd->m);
256 return;
257 }
258
259 fr_dbuff_init(&dbuff, (uint8_t const *)cd->m.data, cd->m.data_size);
260 cw->callbacks[cd->coord_cb_id].callback(cw, &dbuff, now,
262 module_thread(cw->coord->coord_reg->mi)->data, NULL, NULL),
263 cw->callbacks[cd->coord_cb_id].uctx);
264 fr_message_done(&cd->m);
265}
266
267/** Callback run by a coordinator when a worker attaches
268 */
269static void coord_worker_attach(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
270{
271 fr_coord_t *coord = talloc_get_type_abort(ctx, fr_coord_t);
273 fr_coord_msg_t ack;
274 uint32_t thread_id;
275
276 fr_assert(data_size == sizeof(fr_coord_worker_attach_msg_t));
277 fr_assert((msg->worker >= MIN_WORKER_ID) && (msg->worker < (int32_t)coord->max_workers));
278
279 DEBUG2("Worker %d attached to %s", msg->worker, coord->coord_reg->name);
280 coord->num_workers++;
281 thread_id = msg->worker - MIN_WORKER_ID;
282 coord->coord_send_control[thread_id] = msg->worker_recv_control;
283 coord->coord_send_aq[thread_id] = msg->worker_recv_aq;
284
285 ack.worker = msg->worker;
286 fr_control_message_send(coord->coord_send_control[thread_id], coord->coord_send_rb[thread_id],
287 FR_CONTROL_ID_COORD_WORKER_ACK, &ack, sizeof(ack));
288}
289
290/** Callback run by a coordinator when a worker detaches
291 */
292static void coord_worker_detach(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
293{
294 fr_coord_t *coord = talloc_get_type_abort(ctx, fr_coord_t);
296 fr_coord_msg_t ack;
297 uint32_t thread_id;
298
299 fr_assert(data_size == sizeof(fr_coord_worker_detach_msg_t));
300 fr_assert((msg->worker >= MIN_WORKER_ID) && (msg->worker < (int32_t)coord->max_workers));
301 thread_id = msg->worker - MIN_WORKER_ID;
302
303 DEBUG2("Worker %d detached from %s", msg->worker, coord->coord_reg->name);
304 coord->num_workers--;
305
306 ack.worker = msg->worker;
307 fr_control_message_send(coord->coord_send_control[thread_id], coord->coord_send_rb[thread_id],
309
310 coord->coord_send_control[thread_id] = NULL;
311 coord->coord_send_aq[thread_id] = NULL;
312 if (msg->exiting) coord->exiting = true;
313}
314
315/** Create a coordinator from its registration
316 *
317 * @param ctx to allocate the coordinator in
318 * @param el Event list to run this coordinator
319 * @param coord_reg Registration to configure this coordinator
320 * @param single_thread Is the server in single thread mode
321 * @param max_workers The maximum number of workers which will attach
322 * @return
323 * - the coordinator on success
324 * - NULL on failure
325 */
326static fr_coord_t *fr_coord_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coord_reg_t *coord_reg,
327 bool single_thread, uint32_t max_workers)
328{
329 fr_coord_t *coord;
330 uint32_t i, num_threads = max_workers - MIN_WORKER_ID;
331 fr_coord_cb_reg_t *cb = coord_reg->coord_cb;
333
334 MEM(coord = talloc(ctx, fr_coord_t));
335 *coord = (fr_coord_t) {
336 .el = el,
337 .coord_reg = coord_reg,
338 .single_thread = single_thread,
339 .max_workers = max_workers
340 };
341
342 /* Allocate atomic queue / control for receiving messages from workers */
344 if (!aq) {
345 fr_strerror_const("Failed creating worker -> coordinator atomic queue");
346 fail:
347 talloc_free(coord);
348 return NULL;
349 }
350 coord->coord_recv_control = fr_control_create(coord, el, aq, 5);
351 if (!coord->coord_recv_control) {
352 fr_strerror_const("Failed creating worker -> coordinator control plane");
353 goto fail;
354 }
355
356 /* Allocate atomic queue for workers sending data to coordinators */
358 if (!coord->coord_recv_aq) {
359 fr_strerror_const("Failed creating worker -> coordinator data atomic queue");
360 goto fail;
361 }
362
364 coord, coord_worker_attach) < 0) goto fail;
366 coord, coord_worker_detach) < 0) goto fail;
368 coord, coord_data_recv) < 0) goto fail;
369
370 /* Count the number of callbacks defined, for sanity checking messages */
371 while (cb->callback) {
372 coord->num_callbacks++;
373 cb++;
374 }
375 coord->callbacks = coord_reg->coord_cb;
376
377 if (fr_control_open(coord->coord_recv_control) < 0) {
378 fr_strerror_const("Failed opening control plane");
379 goto fail;
380 }
381
382 /*
383 * Coordinator side arrays for holding pointers to worker
384 * specific communication structures. The array sizes are the
385 * number of threads expected to attach which is the number of
386 * workers plus any additional threads, currently just the main
387 * thread (worker id -1)
388 */
389 MEM(coord->coord_send_rb = talloc_array(coord, fr_ring_buffer_t *, num_threads));
390 MEM(coord->coord_send_ms = talloc_array(coord, fr_message_set_t *, num_threads));
391 for (i = 0; i < num_threads; i++) {
393 if (!coord->coord_send_rb[i]) goto fail;
394
396 coord_reg->coord_send_size, true);
397 if (!coord->coord_send_ms[i]) goto fail;
398 }
399 MEM(coord->coord_send_control = talloc_zero_array(coord, fr_control_t *, num_threads));
400 MEM(coord->coord_send_aq = talloc_zero_array(coord, fr_atomic_queue_t *, num_threads));
401
402 MEM(coord->cb_inst = talloc_zero_array(coord, fr_coord_cb_inst_t *, coord->num_callbacks));
403
404 for (i = 0; i < coord->num_callbacks; i++) {
405 if (!coord->callbacks[i].inst_create) continue;
406 coord->cb_inst[i] = coord->callbacks[i].inst_create(coord, coord, coord->el, coord->single_thread,
407 coord->callbacks[i].uctx);
408 if (!coord->cb_inst[i]) goto fail;
409 }
410
411 return coord;
412}
413
414static void fr_coord_destroy(fr_coord_t *coord){
415 uint32_t i;
416
417 for (i = 0; i < coord->num_callbacks; i++) {
418 if (!coord->callbacks[i].inst_destroy) continue;
419 coord->callbacks[i].inst_destroy(coord, coord->cb_inst[i], coord->single_thread,
420 coord->callbacks[i].uctx);
421 }
422}
423
424/** Run the event loop for a coordinator thread when in multi-threaded mode
425 */
426static void fr_coordinate(fr_coord_t *coord)
427{
428 uint32_t i;
429 fr_coord_cb_inst_t *cb_inst;
430
431 /*
432 * Run until we're told to exit AND the number of
433 * workers has dropped to zero.
434 *
435 * Whenever a worker detaches, coord->num_workers
436 * is decremented, so when coord->num_workers == 0,
437 * all workers have detached and are no longer using
438 * the channel.
439 */
440 while (likely(!(coord->exiting && (coord->num_workers == 0)))) {
441 int num_events;
442
443 /*
444 * Check the event list. If there's an error
445 * (e.g. exit), we stop looping and clean up.
446 */
447 DEBUG4("Gathering events");
448 num_events = fr_event_corral(coord->el, fr_time(), true);
449 DEBUG4("%u event(s) pending%s",
450 num_events == -1 ? 0 : num_events, num_events == -1 ? " - event loop exiting" : "");
451 if (num_events < 0) break;
452
453 /*
454 * Service outstanding events.
455 */
456 if (num_events > 0) {
457 DEBUG4("Servicing event(s)");
458 fr_event_service(coord->el);
459 }
460
461 /*
462 * Run any registered instance specific event callbacks
463 */
464 for (i = 0; i < coord->num_callbacks; i++) {
465 cb_inst = coord->cb_inst[i];
466 if (cb_inst && cb_inst->event_cb) cb_inst->event_cb(coord->el, cb_inst->inst_data);
467 }
468 }
469
470 fr_coord_destroy(coord);
471
472 return;
473}
474
475/** Entry point for a coordinator thread
476 */
477static void *fr_coordinate_thread(void *arg)
478{
479 fr_schedule_coord_t *sc = talloc_get_type_abort(arg, fr_schedule_coord_t);
480 fr_coord_reg_t *coord_reg = sc->coord_reg;
482 char coordinate_name[64];
483
484 snprintf(coordinate_name, sizeof(coordinate_name), "Coordinate %s", coord_reg->name);
485
486 if (fr_thread_setup(&sc->thread, coordinate_name) < 0) goto fail;
487
488 sc->coord = fr_coord_create(sc->thread.ctx, sc->thread.el, coord_reg, false, sc->max_workers);
489 if (!sc->coord) {
490 PERROR("%s - Failed creating coordinator thread", coordinate_name);
491 goto fail;
492 }
493
494 /*
495 * Create all the thread specific data for the coordinator thread
496 */
497 if (fr_thread_instantiate(sc->thread.ctx, sc->thread.el) < 0) goto fail;
498
499 /*
500 * Tell the originator that the thread has started.
501 */
502 fr_thread_start(&sc->thread, sc->sem);
503
504 fr_coordinate(sc->coord);
505
506 status = FR_THREAD_EXITED;
507
508fail:
510
511 fr_thread_exit(&sc->thread, status, sc->sem);
512
513 return NULL;
514}
515
516/** Start all registered coordinator threads in multi-threaded mode
517 *
518 * @param num_workers The number of workers which will be attaching
519 * @param sem Semaphore to use signalling the threads are ready
520 * @return
521 * - 0 on success
522 * - -1 on failure
523 */
525{
526 int num = 0;
527
528 if (!coord_regs) return 0;
529
530 MEM(coord_threads = talloc(NULL, fr_dlist_head_t));
533
536
537 MEM(sc = talloc_zero(coord_threads, fr_schedule_coord_t));
538
539 sc->thread.id = num++;
540 sc->coord_reg = coord_reg;
541 sc->max_workers = num_workers;
542 sc->sem = sem;
543
544 if (fr_thread_create(&sc->thread.pthread_id, fr_coordinate_thread, sc) < 0) {
546 PERROR("Failed creating coordinator %s", coord_reg->name);
547 return -1;
548 };
549
551 }
552
553 /*
554 * Wait for all the coordinators to start.
555 */
556 if (fr_thread_wait_list(sem, coord_threads) < 0) {
557 ERROR("Failed creating coordinator threads");
558 return -1;
559 }
560
561 /*
562 * Insert the coordinators in the tree
563 */
565 fr_assert(sc->coord);
566 fr_rb_insert(&coords, sc->coord);
567 }
568
569 return 0;
570}
571
572/** Clean up coordinators in single threaded mode
573 */
575{
576 fr_coord_t *coord;
578
579 if (fr_rb_num_elements(&coords) == 0) return;
580
581 for (coord = fr_rb_iter_init_inorder(&coords, &iter);
582 coord;
583 coord = fr_rb_iter_next_inorder(&coords, &iter)) {
585 fr_coord_destroy(coord);
586 talloc_free(coord);
587 }
588}
589
590/** Start coordinators in single threaded mode
591 */
592int fr_coords_create(TALLOC_CTX *ctx, fr_event_list_t *el)
593{
594 if (!coord_regs) return 0;
595
597
599 char coordinate_name[64];
600 fr_coord_t *coord;
601
602 snprintf(coordinate_name, sizeof(coordinate_name), "Coordinator %s", coord_reg->name);
603
604 INFO("%s - Starting", coordinate_name);
605
606 coord = fr_coord_create(ctx, el, coord_reg, true, 1);
607 if (!coord) {
608 PERROR("%s - Failed creating coordinator thread", coordinate_name);
609 return -1;
610 }
611
612 fr_rb_insert(&coords, coord);
613 }
614
615 return 0;
616}
617
618/** Signal a coordinator that a worker wants to detach
619 *
620 * @param cw Worker which is detaching.
621 * @param exiting Is the server exiting.
622 */
623int fr_coord_detach(fr_coord_worker_t *cw, bool exiting)
624{
626
627 msg = talloc(cw, fr_coord_worker_detach_msg_t);
628 msg->worker = fr_schedule_worker_id();
629 msg->exiting = exiting;
630
633 msg, sizeof(fr_coord_worker_detach_msg_t)) < 0) return -1;
634
636
637 return 0;
638}
639
640/** A worker got an ack from a coordinator in response to attach / detach
641 */
642static void coordinate_worker_ack(UNUSED void *ctx, NDEBUG_UNUSED void const *data, NDEBUG_UNUSED size_t data_size,
643 UNUSED fr_time_t now)
644{
645#ifndef NDEBUG
646 fr_coord_msg_t const *cm = data;
647
648 fr_assert(data_size == sizeof(fr_coord_msg_t));
650#endif
651}
652
653/** Attach a worker to a coordinator
654 *
655 * @param ctx To allocate worker structure in
656 * @param el Event list for control messages
657 * @param coord_reg Coordinator registration to attach to.
658 * @return
659 * - Worker structure for coordinator use on success
660 * - NULL on failure
661 */
663{
665 fr_coord_worker_cb_reg_t *cb_reg = coord_reg->worker_cb;
667 fr_coord_t find;
669
670 cw = talloc_zero(ctx, fr_coord_worker_t);
671
672 find = (fr_coord_t) {
673 .coord_reg = coord_reg
674 };
675 cw->coord = fr_rb_find(&coords, &find);
676 if (!cw->coord) {
677 fail:
678 talloc_free(cw);
679 return NULL;
680 }
681
682 aq = fr_atomic_queue_talloc(cw, 1024);
687 coord_reg->worker_send_size, true);
688
689 while (cb_reg->callback) {
690 cw->num_callbacks++;
691 cb_reg++;
692 }
693 cw->callbacks = coord_reg->worker_cb;
694
696 cw, coordinate_worker_ack) < 0) goto fail;
698 cw, coord_worker_data_recv) < 0) goto fail;
699
700 if (fr_control_open(cw->worker_recv_control) < 0) goto fail;
701
702 msg.worker_recv_control = cw->worker_recv_control;
703 msg.worker_recv_aq = cw->worker_recv_aq;
704 msg.worker = fr_schedule_worker_id();
705
708 &msg, sizeof(fr_coord_worker_attach_msg_t)) < 0) goto fail;
709
711
712 return cw;
713}
714
715/** Send generic data from a coordinator to a worker
716 *
717 * @param coord Coordinator which is sending the data.
718 * @param worker_id Worker to send data to.
719 * @param cb_id Callback ID for the worker to run.
720 * @param dbuff Buffer containing data to send.
721 * @return
722 * - 0 on success
723 * - <0 on failure
724 */
726{
728 fr_coord_data_t *cd = NULL;
729 uint32_t thread_id = worker_id - MIN_WORKER_ID;
730
731 fr_assert((worker_id >= MIN_WORKER_ID) && (worker_id < (int32_t)coord->max_workers));
732
733 cm = (fr_coord_msg_t) {
735 };
736
737 cd = (fr_coord_data_t *)fr_message_alloc(coord->coord_send_ms[thread_id], (fr_message_t *)cd,
738 fr_dbuff_used(dbuff));
739 if (!cd) return -1;
740
741 memcpy(cd->m.data, fr_dbuff_buff(dbuff), fr_dbuff_used(dbuff));
742 cd->coord_cb_id = cb_id;
743 if (!fr_atomic_queue_push(coord->coord_send_aq[thread_id], cd)) {
745 return -1;
746 }
747 return fr_control_message_send(coord->coord_send_control[thread_id], coord->coord_send_rb[thread_id],
749 &cm, sizeof(fr_coord_msg_t));
750}
751
752/** Broadcast data from a coordinator to all workers
753 *
754 * @param coord Coordinator which is sending the data.
755 * @param cb_id Callback ID for the workers to run.
756 * @param dbuff Buffer containing data to send.
757 * @return
758 * - 0 on success
759 * - <0 on failure - indicating the number of sends which failed.
760 */
762{
763 uint32_t i;
764 int failed = 0;
765
766 for (i = 0; i < coord->max_workers; i++) {
767 if (!coord->coord_send_control[i - MIN_WORKER_ID]) continue;
768 if (fr_coord_to_worker_send(coord, i, cb_id, dbuff) < 0) failed++;
769 }
770
771 return 0 - failed;
772}
773
774/** Send data from a worker to a coordinator
775 *
776 * @param cw Worker side of coordinator sending the data.
777 * @param cb_id Callback ID for the coordinator to run.
778 * @param dbuff Buffer containing data to send.
779 * @return
780 * - 0 on success
781 * - < 0 on failure
782 */
784{
786 fr_coord_data_t *cd = NULL;
787
788 cm = (fr_coord_msg_t) {
790 };
791
793 if (!cd) return -1;
794
795 memcpy(cd->m.data, fr_dbuff_buff(dbuff), fr_dbuff_used(dbuff));
796 cd->coord_cb_id = cb_id;
799 return -1;
800 }
801
804}
805
806/** Insert instance specific pre-event callbacks
807 */
809{
810 fr_coord_t *coord;
812 fr_coord_cb_inst_t *cb_inst;
813 uint32_t i;
814
815 if (!coord_regs) return 0;
816
817 for (coord = fr_rb_iter_init_inorder(&coords, &iter);
818 coord != NULL;
819 coord = fr_rb_iter_next_inorder(&coords, &iter)) {
820 for (i = 0; i < coord->num_callbacks; i++) {
821 cb_inst = coord->cb_inst[i];
822 if (cb_inst && cb_inst->event_pre_cb &&
823 fr_event_pre_insert(el, cb_inst->event_pre_cb, cb_inst->inst_data) < 0) {
824 return -1;
825 }
826 }
827 }
828 return 0;
829}
830
831/** Insert instance specific post-event callbacks
832 */
834{
835 fr_coord_t *coord;
837 fr_coord_cb_inst_t *cb_inst;
838 uint32_t i;
839
840 if (!coord_regs) return 0;
841
842 for (coord = fr_rb_iter_init_inorder(&coords, &iter);
843 coord != NULL;
844 coord = fr_rb_iter_next_inorder(&coords, &iter)) {
845 for (i = 0; i < coord->num_callbacks; i++) {
846 cb_inst = coord->cb_inst[i];
847 if (cb_inst && cb_inst->event_post_cb &&
848 fr_event_post_insert(el, cb_inst->event_post_cb, cb_inst->inst_data) < 0) {
849 return -1;
850 }
851 }
852 }
853 return 0;
854}
log_entry msg
Definition acutest.h:794
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
fr_atomic_queue_t * fr_atomic_queue_talloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:512
#define NDEBUG_UNUSED
Definition build.h:347
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:113
#define unlikely(_x)
Definition build.h:407
#define UNUSED
Definition build.h:336
#define FR_CONTROL_MAX_SIZE
Definition control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition control.h:50
static size_t num_workers
static fr_atomic_queue_t ** aq
fr_dlist_t entry
Entry in list of registrations.
Definition coord.c:87
static void coord_worker_attach(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
Callback run by a coordinator when a worker attaches.
Definition coord.c:269
fr_atomic_queue_t * worker_recv_aq
Atomic queue for coordinator -> worker messages.
Definition coord.c:78
uint32_t max_workers
Maximum number of workers which will connect to this coordinator.
Definition coord.c:100
fr_control_t ** coord_send_control
Control planes for coordinator -> worker messages.
Definition coord.c:63
#define FR_CONTROL_ID_COORD_WORKER_DETACH
Message sent from worker to detach from a coordinator.
Definition coord.c:37
static void coordinate_worker_ack(UNUSED void *ctx, NDEBUG_UNUSED void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
A worker got an ack from a coordinator in response to attach / detach.
Definition coord.c:642
fr_coord_reg_t * coord_reg
Coordinator registration details.
Definition coord.c:101
fr_coord_worker_cb_reg_t * worker_cb
Callbacks for coordinator -> worker messages.
Definition coord.c:89
fr_coord_reg_t * fr_coord_register(fr_coord_reg_ctx_t *reg_ctx)
Register a coordinator.
Definition coord.c:137
int fr_coords_create(TALLOC_CTX *ctx, fr_event_list_t *el)
Start coordinators in single threaded mode.
Definition coord.c:592
fr_control_t * worker_recv_control
Control plane to send messages to this worker.
Definition coord.c:110
static fr_dlist_head_t * coord_threads
Definition coord.c:44
static fr_coord_t * fr_coord_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coord_reg_t *coord_reg, bool single_thread, uint32_t max_workers)
Create a coordinator from its registration.
Definition coord.c:326
fr_coord_worker_t * fr_coord_attach(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coord_reg_t *coord_reg)
Attach a worker to a coordinator.
Definition coord.c:662
fr_coord_reg_t * coord_reg
Coordinator registration details.
Definition coord.c:50
size_t worker_send_size
Initial size for worker -> coordinator ring buffer.
Definition coord.c:90
size_t coord_send_size
Initial size for coordinator -> worker ring buffer.
Definition coord.c:91
int32_t worker
Worker ID.
Definition coord.c:109
#define MIN_WORKER_ID
The minimum value we expect as worker id. -1 is the main thread.
Definition coord.c:41
int fr_coord_to_worker_send(fr_coord_t *coord, int32_t worker_id, uint32_t cb_id, fr_dbuff_t *dbuff)
Send generic data from a coordinator to a worker.
Definition coord.c:725
fr_ring_buffer_t ** coord_send_rb
Ring buffers for coordinator -> worker control messages.
Definition coord.c:62
uint32_t max_workers
Maximum number of workers we expect.
Definition coord.c:57
bool exiting
Is the server exiting.
Definition coord.c:116
fr_atomic_queue_t * worker_recv_aq
Atomic queue to send data to this worker.
Definition coord.c:111
static void coord_worker_data_recv(void *ctx, void const *data, size_t data_size, fr_time_t now)
Callback for a worker receiving data from a coordinator.
Definition coord.c:239
static void coord_worker_detach(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
Callback run by a coordinator when a worker detaches.
Definition coord.c:292
fr_coord_t * coord
Coordinator this worker is related to.
Definition coord.c:74
fr_coord_cb_inst_t ** cb_inst
Array of callback instance specific data.
Definition coord.c:55
void fr_coord_deregister(fr_coord_reg_t *coord_reg)
De-register a coordinator.
Definition coord.c:174
void fr_coords_destroy(void)
Clean up coordinators in single threaded mode.
Definition coord.c:574
static int8_t coord_cmp(void const *one, void const *two)
Compare coordinators by registration.
Definition coord.c:121
fr_control_t * coord_recv_control
Control plane for worker -> coordinator messages.
Definition coord.c:60
#define FR_CONTROL_ID_COORD_WORKER_ATTACH
Message sent from worker to attach to a coordinator.
Definition coord.c:36
int fr_worker_to_coord_send(fr_coord_worker_t *cw, uint32_t cb_id, fr_dbuff_t *dbuff)
Send data from a worker to a coordinator.
Definition coord.c:783
static fr_dlist_head_t * coord_regs
Definition coord.c:43
module_instance_t const * mi
Module instance which registered this coordinator.
Definition coord.c:92
fr_coord_cb_reg_t * callbacks
Array of callbacks for worker -> coordinator messages.
Definition coord.c:53
int fr_coord_post_event_insert(fr_event_list_t *el)
Insert instance specific post-event callbacks.
Definition coord.c:833
int fr_coord_pre_event_insert(fr_event_list_t *el)
Insert instance specific pre-event callbacks.
Definition coord.c:808
uint32_t num_callbacks
Number of callbacks registered.
Definition coord.c:80
fr_coord_cb_reg_t * coord_cb
Callbacks for worker -> coordinator messages.
Definition coord.c:88
fr_atomic_queue_t * coord_recv_aq
Atomic queue for worker -> coordinator.
Definition coord.c:61
static void * fr_coordinate_thread(void *arg)
Entry point for a coordinator thread.
Definition coord.c:477
static void coord_data_recv(void *ctx, void const *data, size_t data_size, fr_time_t now)
Callback for a coordinator receiving data from a worker.
Definition coord.c:207
fr_atomic_queue_t ** coord_send_aq
Atomic queues for coordinator -> worker data.
Definition coord.c:65
uint32_t num_callbacks
Number of callbacks defined.
Definition coord.c:54
bool exiting
Is this coordinator shutting down.
Definition coord.c:67
fr_rb_node_t node
Entry in the tree of coordinators.
Definition coord.c:52
fr_coord_worker_cb_reg_t * callbacks
Callbacks for coordinator -> worker messages.
Definition coord.c:79
fr_event_list_t * el
Coordinator event list.
Definition coord.c:51
int fr_coord_start(uint32_t num_workers, fr_sem_t *sem)
Start all registered coordinator threads in multi-threaded mode.
Definition coord.c:524
#define FR_CONTROL_ID_COORD_WORKER_ACK
Message sent to worker to acknowledge attach / detach.
Definition coord.c:38
#define FR_CONTROL_ID_COORD_DATA
Worker <-> coordinator message to pass data to a callback.
Definition coord.c:39
fr_message_set_t ** coord_send_ms
Message sets for coordinator -> worker data.
Definition coord.c:64
fr_thread_t thread
common thread information - must be first!
Definition coord.c:98
static void fr_coordinate(fr_coord_t *coord)
Run the event loop for a coordinator thread when in multi-threaded mode.
Definition coord.c:426
fr_control_t * worker_recv_control
Coordinator -> worker control plane.
Definition coord.c:77
bool single_thread
Are we in single thread mode.
Definition coord.c:68
char const * name
Name for debugging.
Definition coord.c:86
int fr_coord_to_worker_broadcast(fr_coord_t *coord, uint32_t cb_id, fr_dbuff_t *dbuff)
Broadcast data from a coordinator to all workers.
Definition coord.c:761
int fr_coord_detach(fr_coord_worker_t *cw, bool exiting)
Signal a coordinator that a worker wants to detach.
Definition coord.c:623
int32_t worker
Worker ID.
Definition coord.c:115
fr_ring_buffer_t * worker_send_rb
Ring buffer for worker -> coordinator control plane.
Definition coord.c:75
fr_coord_t * coord
The coordinator data structure.
Definition coord.c:102
static fr_rb_tree_t coords
Definition coord.c:45
uint32_t num_workers
How many workers are attached.
Definition coord.c:58
fr_message_set_t * worker_send_ms
Message set for worker -> coordinator messages.
Definition coord.c:76
static void fr_coord_destroy(fr_coord_t *coord)
Definition coord.c:414
fr_sem_t * sem
For inter-thread signaling.
Definition coord.c:103
A coordinator registration.
Definition coord.c:85
A coordinator which receives messages from workers.
Definition coord.c:49
Control plane message used for workers attaching / detaching to coordinators.
Definition coord.c:108
The worker end of worker <-> coordinator communication.
Definition coord.c:73
Scheduler specific information for coordinator threads.
Definition coord.c:97
fr_coord_cb_t callback
Definition coord.h:48
fr_coord_cb_reg_t * coord_cb
Callbacks for worker -> coordinator messages.
Definition coord.h:64
struct fr_coord_s fr_coord_t
Definition coord.h:37
void * uctx
Definition coord.h:51
char const * name
Name for this coordinator.
Definition coord.h:63
module_instance_t const * mi
Module instance registering this coordinator.
Definition coord.h:70
fr_coord_worker_cb_t callback
Definition coord.h:56
size_t coord_send_size
Initial ring buffer size for coordinator -> worker data.
Definition coord.h:68
struct fr_coord_reg_s fr_coord_reg_t
Definition coord.h:35
fr_coord_worker_cb_reg_t * worker_cb
Callbacks for coordinator -> worker messages.
Definition coord.h:65
size_t worker_send_size
Initial ring buffer size for worker -> coordinator data.
Definition coord.h:66
fr_coord_cb_inst_create_t inst_create
Definition coord.h:49
fr_coord_cb_inst_destroy_t inst_destroy
Definition coord.h:50
fr_coord_inst_event_cb_t event_cb
Event callback in multi thread mode.
Definition coord_priv.h:48
void * inst_data
Instance data.
Definition coord_priv.h:45
int32_t worker
Worker ID.
Definition coord_priv.h:33
uint32_t coord_cb_id
Callback ID for this message.
Definition coord_priv.h:40
fr_event_status_cb_t event_pre_cb
Pre-event callback in single thread mode.
Definition coord_priv.h:46
fr_event_post_cb_t event_post_cb
Post-event callback in single thread mode.
Definition coord_priv.h:47
fr_message_t m
Message containing data being sent.
Definition coord_priv.h:39
List / data message used between workers and coordinators.
Definition coord_priv.h:38
Generic control message used between workers and coordinators.
Definition coord_priv.h:32
#define fr_dbuff_used(_dbuff_or_marker)
Return the number of bytes remaining between the start of the dbuff or marker and the current positio...
Definition dbuff.h:775
#define fr_dbuff_init(_out, _start, _len_or_end)
Initialise an dbuff for encoding or decoding.
Definition dbuff.h:362
#define fr_dbuff_buff(_dbuff_or_marker)
Return the underlying buffer in a dbuff or one of marker.
Definition dbuff.h:890
#define MEM(x)
Definition debug.h:46
#define ERROR(fmt,...)
Definition dhcpclient.c:40
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:242
#define fr_dlist_foreach(_list_head, _type, _iter)
Iterate over the contents of a list.
Definition dlist.h:98
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition dlist.h:620
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:921
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:360
#define fr_dlist_talloc_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:257
Head of a doubly linked list.
Definition dlist.h:51
Entry in a doubly linked list.
Definition dlist.h:41
free(array)
talloc_free(hp)
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq, size_t num_callbacks)
Create a control-plane signaling path.
Definition control.c:152
int fr_control_callback_add(fr_control_t **c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition control.c:443
int fr_control_open(fr_control_t *c)
Open the control-plane signalling path.
Definition control.c:176
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:355
void fr_control_wait(fr_control_t *c)
Wait for a plane control to become readable.
Definition control.c:542
The control structure.
Definition control.c:76
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define DEBUG4(_fmt,...)
Definition log.h:267
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition event.c:2187
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition event.c:2055
#define fr_time()
Definition event.c:60
int fr_event_pre_insert(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Add a pre-event callback to the event list.
Definition event.c:1963
int fr_event_post_insert(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Add a post-event callback to the event list.
Definition event.c:2010
Stores all information relating to an event list.
Definition event.c:377
unsigned int uint32_t
unsigned char uint8_t
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
Create a message set.
Definition message.c:127
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:195
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:1000
A Message set, composed of message headers and ring buffer data.
Definition message.c:94
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
#define MODULE_CTX(_mi, _thread, _env_data, _rctx)
Wrapper to create a module_ctx_t as a compound literal.
Definition module_ctx.h:128
#define fr_assert(_expr)
Definition rad_assert.h:37
#define DEBUG2(fmt,...)
#define INFO(fmt,...)
Definition radict.c:63
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
Return how many nodes there are in a tree.
Definition rb.c:781
void * fr_rb_iter_init_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Initialise an in-order iterator.
Definition rb.c:824
void fr_rb_iter_delete_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Remove the current node from the tree.
Definition rb.c:899
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
void * fr_rb_iter_next_inorder(UNUSED fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Return the next node.
Definition rb.c:850
struct fr_rb_tree_s fr_rb_tree_t
Definition rb.h:51
#define fr_rb_inline_talloc_init(_tree, _type, _field, _data_cmp, _data_free)
Initialises a red black that verifies elements are of a specific talloc type.
Definition rb.h:155
uint32_t num_elements
How many elements are inside the tree.
Definition rb.h:95
Iterator structure for in-order traversal of an rbtree.
Definition rb.h:319
The main red black tree structure.
Definition rb.h:71
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:63
static _Thread_local int worker_id
Internal ID of the current worker thread.
Definition schedule.c:104
int fr_schedule_worker_id(void)
Return the worker id for the current thread.
Definition schedule.c:110
sem_t fr_sem_t
Definition semaphore.h:53
void * data
Thread specific instance data.
Definition module.h:374
static module_thread_instance_t * module_thread(module_instance_t const *mi)
Retrieve module/thread specific instance for a module.
Definition module.h:503
Module instance data.
Definition module.h:287
static const uchar sc[16]
Definition smbdes.c:115
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
Definition snprintf.c:689
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
void fr_thread_start(fr_thread_t *thread, fr_sem_t *sem)
Signal the parent that we're done.
Definition thread.c:203
int fr_thread_wait_list(fr_sem_t *sem, fr_dlist_head_t *head)
Wait for multiple threads to signal readiness via a semaphore.
Definition thread.c:79
int fr_thread_create(pthread_t *thread, fr_thread_entry_t func, void *arg)
Create a joinable thread.
Definition thread.c:46
int fr_thread_setup(fr_thread_t *out, char const *name)
Common setup for child threads: block signals, allocate a talloc context, and create an event list.
Definition thread.c:112
int fr_thread_instantiate(TALLOC_CTX *ctx, fr_event_list_t *el)
Instantiate thread-specific data for modules, virtual servers, xlats, unlang, and TLS.
Definition thread.c:165
void fr_thread_detach(void)
Detach thread-specific data for modules, virtual servers, xlats.
Definition thread.c:190
void fr_thread_exit(fr_thread_t *thread, fr_thread_status_t status, fr_sem_t *sem)
Signal the parent that we're done.
Definition thread.c:218
fr_thread_status_t
Track the child thread status.
Definition thread.h:38
@ FR_THREAD_EXITED
exited, and in the exited queue
Definition thread.h:42
@ FR_THREAD_FAIL
failed, and in the exited queue
Definition thread.h:43
"server local" time.
Definition time.h:69
static fr_event_list_t * el
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1340