The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
coord.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: 1d2e086dac020b04e4fcf2b71c6cc3638a776087 $
19 *
20 * @brief Coordination thread management
21 * @file io/coord.c
22 *
23 * @copyright 2026 Network RADIUS SAS (legal@networkradius.com)
24 */
25RCSID("$Id: 1d2e086dac020b04e4fcf2b71c6cc3638a776087 $")
26
27#include <freeradius-devel/io/listen.h>
28#include <freeradius-devel/io/schedule.h>
29#include <freeradius-devel/io/thread.h>
30#include <freeradius-devel/io/coord_priv.h>
31#include <freeradius-devel/unlang/base.h>
32#include <freeradius-devel/util/syserror.h>
33
34#include <stdalign.h>
35
36#define FR_CONTROL_ID_COORD_WORKER_ATTACH (1) //!< Message sent from worker to attach to a coordinator
37#define FR_CONTROL_ID_COORD_WORKER_DETACH (2) //!< Message sent from worker to detach from a coordinator
38#define FR_CONTROL_ID_COORD_WORKER_ACK (3) //!< Message sent to worker to acknowledge attach / detach
39#define FR_CONTROL_ID_COORD_DATA (4) //!< Worker <-> coordinator message to pass data to a callback
40
41#define MIN_WORKER_ID -1 //!< The minimum value we expect as worker id. -1 is the main thread.
42
46
47/** A coordinator which receives messages from workers
48 */
49struct fr_coord_s {
50 fr_coord_reg_t *coord_reg; //!< Coordinator registration details.
51 fr_event_list_t *el; //!< Coordinator event list.
52 fr_rb_node_t node; //!< Entry in the tree of coordinators.
53 fr_coord_cb_reg_t *callbacks; //!< Array of callbacks for worker -> coordinator messages.
54 uint32_t num_callbacks; //!< Number of callbacks defined.
55 fr_coord_cb_inst_t **cb_inst; //!< Array of callback instance specific data.
56
57 uint32_t max_workers; //!< Maximum number of workers we expect.
58 uint32_t num_workers; //!< How many workers are attached.
59
60 fr_control_t *coord_recv_control; //!< Control plane for worker -> coordinator messages.
61 fr_atomic_queue_t *coord_recv_aq; //!< Atomic queue for worker -> coordinator
62 fr_ring_buffer_t **coord_send_rb; //!< Ring buffers for coordinator -> worker control messages.
63 fr_control_t **coord_send_control; //!< Control planes for coordinator -> worker messages.
64 fr_message_set_t **coord_send_ms; //!< Message sets for coordinator -> worker data.
65 fr_atomic_queue_t **coord_send_aq; //!< Atomic queues for coordinator -> worker data.
66
67 bool exiting; //!< Is this coordinator shutting down.
68 bool single_thread; //!< Are we in single thread mode.
69};
70
71/** The worker end of worker <-> coordinator communication.
72 */
74 fr_coord_t *coord; //!< Coordinator this worker is related to
75 fr_ring_buffer_t *worker_send_rb; //!< Ring buffer for worker -> coordinator control plane
76 fr_message_set_t *worker_send_ms; //!< Message set for worker -> coordinator messages
77 fr_control_t *worker_recv_control; //!< Coordinator -> worker control plane
78 fr_atomic_queue_t *worker_recv_aq; //!< Atomic queue for coordinator -> worker messages
79 fr_coord_worker_cb_reg_t *callbacks; //!< Callbacks for coordinator -> worker messages
80 uint32_t num_callbacks; //!< Number of callbacks registered.
81};
82
83/** A coordinator registration
84 */
86 char const *name; //!< Name for debugging.
87 fr_dlist_t entry; //!< Entry in list of registrations.
88 fr_coord_cb_reg_t *coord_cb; //!< Callbacks for worker -> coordinator messages.
89 fr_coord_worker_cb_reg_t *worker_cb; //!< Callbacks for coordinator -> worker messages.
90 size_t worker_send_size; //!< Initial size for worker -> coordinator ring buffer.
91 size_t coord_send_size; //!< Initial size for coordinator -> worker ring buffer.
92 module_instance_t const *mi; //!< Module instance which registered this coordinator.
93};
94
95/** Scheduler specific information for coordinator threads
96 */
97typedef struct {
98 fr_thread_t thread; //!< common thread information - must be first!
99
100 uint32_t max_workers; //!< Maximum number of workers which will connect to this coordinator.
101 fr_coord_reg_t *coord_reg; //!< Coordinator registration details.
102 fr_coord_t *coord; //!< The coordinator data structure.
103 fr_sem_t *sem; //!< For inter-thread signaling.
105
106/** Control plane message used for workers attaching / detaching to coordinators
107 */
108typedef struct {
109 int32_t worker; //!< Worker ID
110 fr_control_t *worker_recv_control; //!< Control plane to send messages to this worker
111 fr_atomic_queue_t *worker_recv_aq; //!< Atomic queue to send data to this worker
113
114typedef struct {
115 int32_t worker; //!< Worker ID
116 bool exiting; //!< Is the server exiting
118
119/** Compare coordinators by registration
120 */
121static int8_t coord_cmp(void const *one, void const *two)
122{
123 fr_coord_t const *a = one, *b = two;
124
125 return CMP(a->coord_reg, b->coord_reg);
126}
127
128/** Register a coordinator
129 *
130 * To be called from mod_instantiate of a module which uses a coordinator
131 *
132 * @param ctx to allocate registration under
133 * @param reg_ctx Registration data
134 * @return
135 * - coordination registration on success
136 * - NULL on failure
137 */
139{
140 fr_coord_reg_t *coord_reg;
141
142 fr_assert(reg_ctx->coord_cb);
143 fr_assert(reg_ctx->worker_cb);
144 fr_assert(reg_ctx->mi);
145
146 /* Allocate the list of registered coordinators if not already done */
147 if (!coord_regs) {
148 MEM(coord_regs = talloc_zero(NULL, fr_dlist_head_t));
150 }
151
152 MEM(coord_reg = talloc(ctx, fr_coord_reg_t));
153 *coord_reg = (fr_coord_reg_t) {
154 .name = reg_ctx->name,
155 .coord_cb = reg_ctx->coord_cb,
156 .worker_cb = reg_ctx->worker_cb,
157 .worker_send_size = reg_ctx->worker_send_size ? reg_ctx->worker_send_size : 4096,
158 .coord_send_size = reg_ctx->coord_send_size ? reg_ctx->coord_send_size : 4096,
159 .mi = reg_ctx->mi,
160 };
161
163
164 return coord_reg;
165}
166
167/** De-register a coordinator
168 *
169 * To be called from mod_detach of a module which uses a coordinator
170 *
171 * When running in threaded mode, will wait for the coordinator to exit.
172 *
173 * @param coord_reg to de-register
174 */
176{
177 int ret;
178
179 fr_dlist_remove(coord_regs, coord_reg);
180
181 /*
182 * In single threaded mode just free the registration.
183 */
184 if (!coord_threads) goto free;
185
187 if (sc->coord_reg == coord_reg) {
188 if ((ret = pthread_join(sc->thread.pthread_id, NULL)) != 0) {
189 ERROR("Failed joining coordinator %s: %s", coord_reg->name, fr_syserror(ret));
190 } else {
191 DEBUG2("Coordinator %s joined (cleaned up)", coord_reg->name);
192 }
193
196 break;
197 }
198 }
199
200free:
201 talloc_free(coord_reg);
202
203 if (fr_dlist_num_elements(coord_regs) == 0) TALLOC_FREE(coord_regs);
204}
205
206/** Callback for a coordinator receiving data from a worker
207 */
208static void coord_data_recv(void *ctx, void const *data, size_t data_size, fr_time_t now)
209{
210 fr_coord_t *coord = talloc_get_type_abort(ctx, fr_coord_t);
212 fr_coord_data_t *cd;
213 fr_dbuff_t dbuff;
214
215 fr_assert(data_size == sizeof(cm));
216 memcpy(&cm, data, data_size);
217 fr_assert((cm.worker >= MIN_WORKER_ID) && (cm.worker < (int32_t)coord->max_workers));
218
219 if (unlikely(!fr_atomic_queue_pop(coord->coord_recv_aq, (void **)&cd))) return;
220
221 DEBUG3("Coordinator %s got data from worker %d for callback %d",
222 coord->coord_reg->name, cm.worker, cd->coord_cb_id);
223
224 if (cd->coord_cb_id >= coord->num_callbacks) {
225 ERROR("Received data for callback %d which is not defined", cd->coord_cb_id);
226 fr_message_done(&cd->m);
227 return;
228 }
229
230 fr_dbuff_init(&dbuff, (uint8_t const *)cd->m.data, cd->m.data_size);
231 coord->callbacks[cd->coord_cb_id].callback(coord, cm.worker, &dbuff, now,
232 coord->cb_inst[cd->coord_cb_id] ?
233 coord->cb_inst[cd->coord_cb_id]->inst_data : NULL,
234 coord->callbacks[cd->coord_cb_id].uctx);
235 fr_message_done(&cd->m);
236}
237
238/** Callback for a worker receiving data from a coordinator
239 */
240static void coord_worker_data_recv(void *ctx, void const *data, size_t data_size, fr_time_t now)
241{
242 fr_coord_worker_t *cw = talloc_get_type_abort(ctx, fr_coord_worker_t);
244 fr_coord_data_t *cd;
245 fr_dbuff_t dbuff;
246
247 fr_assert(data_size == sizeof(cm));
248 memcpy(&cm, data, data_size);
249
250 if (unlikely(!fr_atomic_queue_pop(cw->worker_recv_aq, (void **)&cd))) return;
251
252 DEBUG3("Coordinator %s sent message for callback %d", cw->coord->coord_reg->name, cd->coord_cb_id);
253
254 if (cd->coord_cb_id >= cw->num_callbacks) {
255 ERROR("Received message for callback %d which is not defined", cd->coord_cb_id);
256 fr_message_done(&cd->m);
257 return;
258 }
259
260 fr_dbuff_init(&dbuff, (uint8_t const *)cd->m.data, cd->m.data_size);
261 cw->callbacks[cd->coord_cb_id].callback(cw, &dbuff, now,
263 module_thread(cw->coord->coord_reg->mi)->data, NULL, NULL),
264 cw->callbacks[cd->coord_cb_id].uctx);
265 fr_message_done(&cd->m);
266}
267
268/** Callback run by a coordinator when a worker attaches
269 */
270static void coord_worker_attach(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
271{
272 fr_coord_t *coord = talloc_get_type_abort(ctx, fr_coord_t);
274 fr_coord_msg_t ack;
275 uint32_t thread_id;
276
277 fr_assert(data_size == sizeof(fr_coord_worker_attach_msg_t));
278 fr_assert((msg->worker >= MIN_WORKER_ID) && (msg->worker < (int32_t)coord->max_workers));
279
280 DEBUG2("Worker %d attached to %s", msg->worker, coord->coord_reg->name);
281 coord->num_workers++;
282 thread_id = msg->worker - MIN_WORKER_ID;
283 coord->coord_send_control[thread_id] = msg->worker_recv_control;
284 coord->coord_send_aq[thread_id] = msg->worker_recv_aq;
285
286 ack.worker = msg->worker;
287 fr_control_message_send(coord->coord_send_control[thread_id], coord->coord_send_rb[thread_id],
288 FR_CONTROL_ID_COORD_WORKER_ACK, &ack, sizeof(ack));
289}
290
291/** Callback run by a coordinator when a worker detaches
292 */
293static void coord_worker_detach(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
294{
295 fr_coord_t *coord = talloc_get_type_abort(ctx, fr_coord_t);
297 fr_coord_msg_t ack;
298 uint32_t thread_id;
299
300 fr_assert(data_size == sizeof(fr_coord_worker_detach_msg_t));
301 fr_assert((msg->worker >= MIN_WORKER_ID) && (msg->worker < (int32_t)coord->max_workers));
302 thread_id = msg->worker - MIN_WORKER_ID;
303
304 DEBUG2("Worker %d detached from %s", msg->worker, coord->coord_reg->name);
305 coord->num_workers--;
306
307 ack.worker = msg->worker;
308 fr_control_message_send(coord->coord_send_control[thread_id], coord->coord_send_rb[thread_id],
310
311 coord->coord_send_control[thread_id] = NULL;
312 coord->coord_send_aq[thread_id] = NULL;
313 if (msg->exiting) coord->exiting = true;
314}
315
316/** Create a coordinator from its registration
317 *
318 * @param ctx to allocate the coordinator in
319 * @param el Event list to run this coordinator
320 * @param coord_reg Registration to configure this coordinator
321 * @param single_thread Is the server in single thread mode
322 * @param max_workers The maximum number of workers which will attach
323 * @return
324 * - the coordinator on success
325 * - NULL on failure
326 */
327static fr_coord_t *fr_coord_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coord_reg_t *coord_reg,
328 bool single_thread, uint32_t max_workers)
329{
330 fr_coord_t *coord;
331 uint32_t i, num_threads = max_workers - MIN_WORKER_ID;
332 fr_coord_cb_reg_t *cb = coord_reg->coord_cb;
334
335 MEM(coord = talloc(ctx, fr_coord_t));
336 *coord = (fr_coord_t) {
337 .el = el,
338 .coord_reg = coord_reg,
339 .single_thread = single_thread,
340 .max_workers = max_workers
341 };
342
343 /* Allocate atomic queue / control for receiving messages from workers */
345 if (!aq) {
346 fr_strerror_const("Failed creating worker -> coordinator atomic queue");
347 fail:
348 talloc_free(coord);
349 return NULL;
350 }
351 coord->coord_recv_control = fr_control_create(coord, el, aq, 5);
352 if (!coord->coord_recv_control) {
353 fr_strerror_const("Failed creating worker -> coordinator control plane");
354 goto fail;
355 }
356
357 /* Allocate atomic queue for workers sending data to coordinators */
359 if (!coord->coord_recv_aq) {
360 fr_strerror_const("Failed creating worker -> coordinator data atomic queue");
361 goto fail;
362 }
363
365 coord, coord_worker_attach) < 0) goto fail;
367 coord, coord_worker_detach) < 0) goto fail;
369 coord, coord_data_recv) < 0) goto fail;
370
371 /* Count the number of callbacks defined, for sanity checking messages */
372 while (cb->callback) {
373 coord->num_callbacks++;
374 cb++;
375 }
376 coord->callbacks = coord_reg->coord_cb;
377
378 if (fr_control_open(coord->coord_recv_control) < 0) {
379 fr_strerror_const("Failed opening control plane");
380 goto fail;
381 }
382
383 /*
384 * Coordinator side arrays for holding pointers to worker
385 * specific communication structures. The array sizes are the
386 * number of threads expected to attach which is the number of
387 * workers plus any additional threads, currently just the main
388 * thread (worker id -1)
389 */
390 MEM(coord->coord_send_rb = talloc_array(coord, fr_ring_buffer_t *, num_threads));
391 MEM(coord->coord_send_ms = talloc_array(coord, fr_message_set_t *, num_threads));
392 for (i = 0; i < num_threads; i++) {
394 if (!coord->coord_send_rb[i]) goto fail;
395
397 coord_reg->coord_send_size, true);
398 if (!coord->coord_send_ms[i]) goto fail;
399 }
400 MEM(coord->coord_send_control = talloc_zero_array(coord, fr_control_t *, num_threads));
401 MEM(coord->coord_send_aq = talloc_zero_array(coord, fr_atomic_queue_t *, num_threads));
402
403 MEM(coord->cb_inst = talloc_zero_array(coord, fr_coord_cb_inst_t *, coord->num_callbacks));
404
405 for (i = 0; i < coord->num_callbacks; i++) {
406 if (!coord->callbacks[i].inst_create) continue;
407 coord->cb_inst[i] = coord->callbacks[i].inst_create(coord, coord, coord->el, coord->single_thread,
408 coord->callbacks[i].uctx);
409 if (!coord->cb_inst[i]) goto fail;
410 }
411
412 return coord;
413}
414
415static void fr_coord_destroy(fr_coord_t *coord){
416 uint32_t i;
417
418 for (i = 0; i < coord->num_callbacks; i++) {
419 if (!coord->callbacks[i].inst_destroy) continue;
420 coord->callbacks[i].inst_destroy(coord, coord->cb_inst[i], coord->single_thread,
421 coord->callbacks[i].uctx);
422 }
423}
424
425/** Run the event loop for a coordinator thread when in multi-threaded mode
426 */
427static void fr_coordinate(fr_coord_t *coord)
428{
429 uint32_t i;
430 fr_coord_cb_inst_t *cb_inst;
431
432 /*
433 * Run until we're told to exit AND the number of
434 * workers has dropped to zero.
435 *
436 * Whenever a worker detaches, coord->num_workers
437 * is decremented, so when coord->num_workers == 0,
438 * all workers have detached and are no longer using
439 * the channel.
440 */
441 while (likely(!(coord->exiting && (coord->num_workers == 0)))) {
442 int num_events;
443
444 /*
445 * Check the event list. If there's an error
446 * (e.g. exit), we stop looping and clean up.
447 */
448 DEBUG4("Gathering events");
449 num_events = fr_event_corral(coord->el, fr_time(), true);
450 DEBUG4("%u event(s) pending%s",
451 num_events == -1 ? 0 : num_events, num_events == -1 ? " - event loop exiting" : "");
452 if (num_events < 0) break;
453
454 /*
455 * Service outstanding events.
456 */
457 if (num_events > 0) {
458 DEBUG4("Servicing event(s)");
459 fr_event_service(coord->el);
460 }
461
462 /*
463 * Run any registered instance specific event callbacks
464 */
465 for (i = 0; i < coord->num_callbacks; i++) {
466 cb_inst = coord->cb_inst[i];
467 if (cb_inst && cb_inst->event_cb) cb_inst->event_cb(coord->el, cb_inst->inst_data);
468 }
469 }
470
471 fr_coord_destroy(coord);
472
473 return;
474}
475
476/** Entry point for a coordinator thread
477 */
478static void *fr_coordinate_thread(void *arg)
479{
480 fr_schedule_coord_t *sc = talloc_get_type_abort(arg, fr_schedule_coord_t);
481 fr_coord_reg_t *coord_reg = sc->coord_reg;
483 char coordinate_name[64];
484
485 snprintf(coordinate_name, sizeof(coordinate_name), "Coordinate %s", coord_reg->name);
486
487 if (fr_thread_setup(&sc->thread, coordinate_name) < 0) goto fail;
488
489 sc->coord = fr_coord_create(sc->thread.ctx, sc->thread.el, coord_reg, false, sc->max_workers);
490 if (!sc->coord) {
491 PERROR("%s - Failed creating coordinator thread", coordinate_name);
492 goto fail;
493 }
494
495 /*
496 * Create all the thread specific data for the coordinator thread
497 */
498 if (fr_thread_instantiate(sc->thread.ctx, sc->thread.el) < 0) goto fail;
499
500 /*
501 * Tell the originator that the thread has started.
502 */
503 fr_thread_start(&sc->thread, sc->sem);
504
505 fr_coordinate(sc->coord);
506
507 status = FR_THREAD_EXITED;
508
509fail:
511
512 fr_thread_exit(&sc->thread, status, sc->sem);
513
514 return NULL;
515}
516
517/** Start all registered coordinator threads in multi-threaded mode
518 *
519 * @param num_workers The number of workers which will be attaching
520 * @param sem Semaphore to use signalling the threads are ready
521 * @return
522 * - 0 on success
523 * - -1 on failure
524 */
526{
527 int num = 0;
528
529 if (!coord_regs) return 0;
530
531 MEM(coord_threads = talloc(NULL, fr_dlist_head_t));
534
537
538 MEM(sc = talloc_zero(coord_threads, fr_schedule_coord_t));
539
540 sc->thread.id = num++;
541 sc->coord_reg = coord_reg;
542 sc->max_workers = num_workers;
543 sc->sem = sem;
544
545 if (fr_thread_create(&sc->thread.pthread_id, fr_coordinate_thread, sc) < 0) {
547 PERROR("Failed creating coordinator %s", coord_reg->name);
548 return -1;
549 };
550
552 }
553
554 /*
555 * Wait for all the coordinators to start.
556 */
557 if (fr_thread_wait_list(sem, coord_threads) < 0) {
558 ERROR("Failed creating coordinator threads");
559 return -1;
560 }
561
562 /*
563 * Insert the coordinators in the tree
564 */
566 fr_assert(sc->coord);
567 fr_rb_insert(&coords, sc->coord);
568 }
569
570 return 0;
571}
572
573/** Clean up coordinators in single threaded mode
574 */
576{
577 fr_coord_t *coord;
579
580 if (fr_rb_num_elements(&coords) == 0) return;
581
582 for (coord = fr_rb_iter_init_inorder(&coords, &iter);
583 coord;
584 coord = fr_rb_iter_next_inorder(&coords, &iter)) {
586 fr_coord_destroy(coord);
587 talloc_free(coord);
588 }
589}
590
591/** Start coordinators in single threaded mode
592 */
593int fr_coords_create(TALLOC_CTX *ctx, fr_event_list_t *el)
594{
595 if (!coord_regs) return 0;
596
598
600 char coordinate_name[64];
601 fr_coord_t *coord;
602
603 snprintf(coordinate_name, sizeof(coordinate_name), "Coordinator %s", coord_reg->name);
604
605 INFO("%s - Starting", coordinate_name);
606
607 coord = fr_coord_create(ctx, el, coord_reg, true, 1);
608 if (!coord) {
609 PERROR("%s - Failed creating coordinator thread", coordinate_name);
610 return -1;
611 }
612
613 fr_rb_insert(&coords, coord);
614 }
615
616 return 0;
617}
618
619/** Signal a coordinator that a worker wants to detach
620 *
621 * @param cw Worker which is detaching.
622 * @param exiting Is the server exiting.
623 */
624int fr_coord_detach(fr_coord_worker_t *cw, bool exiting)
625{
627
628 msg = talloc(cw, fr_coord_worker_detach_msg_t);
629 msg->worker = fr_schedule_worker_id();
630 msg->exiting = exiting;
631
634 msg, sizeof(fr_coord_worker_detach_msg_t)) < 0) return -1;
635
637
638 return 0;
639}
640
641/** A worker got an ack from a coordinator in response to attach / detach
642 */
643static void coordinate_worker_ack(UNUSED void *ctx, NDEBUG_UNUSED void const *data, NDEBUG_UNUSED size_t data_size,
644 UNUSED fr_time_t now)
645{
646#ifndef NDEBUG
647 fr_coord_msg_t const *cm = data;
648
649 fr_assert(data_size == sizeof(fr_coord_msg_t));
651#endif
652}
653
654/** Attach a worker to a coordinator
655 *
656 * @param ctx To allocate worker structure in
657 * @param el Event list for control messages
658 * @param coord_reg Coordinator registration to attach to.
659 * @return
660 * - Worker structure for coordinator use on success
661 * - NULL on failure
662 */
664{
666 fr_coord_worker_cb_reg_t *cb_reg = coord_reg->worker_cb;
668 fr_coord_t find;
670
671 cw = talloc_zero(ctx, fr_coord_worker_t);
672
673 find = (fr_coord_t) {
674 .coord_reg = coord_reg
675 };
676 cw->coord = fr_rb_find(&coords, &find);
677 if (!cw->coord) {
678 fail:
679 talloc_free(cw);
680 return NULL;
681 }
682
683 aq = fr_atomic_queue_alloc(cw, 1024);
688 coord_reg->worker_send_size, true);
689
690 while (cb_reg->callback) {
691 cw->num_callbacks++;
692 cb_reg++;
693 }
694 cw->callbacks = coord_reg->worker_cb;
695
697 cw, coordinate_worker_ack) < 0) goto fail;
699 cw, coord_worker_data_recv) < 0) goto fail;
700
701 if (fr_control_open(cw->worker_recv_control) < 0) goto fail;
702
703 msg.worker_recv_control = cw->worker_recv_control;
704 msg.worker_recv_aq = cw->worker_recv_aq;
705 msg.worker = fr_schedule_worker_id();
706
709 &msg, sizeof(fr_coord_worker_attach_msg_t)) < 0) goto fail;
710
712
713 return cw;
714}
715
716/** Send generic data from a coordinator to a worker
717 *
718 * @param coord Coordinator which is sending the data.
719 * @param worker_id Worker to send data to.
720 * @param cb_id Callback ID for the worker to run.
721 * @param dbuff Buffer containing data to send.
722 * @return
723 * - 0 on success
724 * - <0 on failure
725 */
727{
729 fr_coord_data_t *cd = NULL;
730 uint32_t thread_id = worker_id - MIN_WORKER_ID;
731
732 fr_assert((worker_id >= MIN_WORKER_ID) && (worker_id < (int32_t)coord->max_workers));
733
734 cm = (fr_coord_msg_t) {
736 };
737
738 cd = (fr_coord_data_t *)fr_message_alloc(coord->coord_send_ms[thread_id], (fr_message_t *)cd,
739 fr_dbuff_used(dbuff));
740 if (!cd) return -1;
741
742 memcpy(cd->m.data, fr_dbuff_buff(dbuff), fr_dbuff_used(dbuff));
743 cd->coord_cb_id = cb_id;
744 if (!fr_atomic_queue_push(coord->coord_send_aq[thread_id], cd)) {
746 return -1;
747 }
748 return fr_control_message_send(coord->coord_send_control[thread_id], coord->coord_send_rb[thread_id],
750 &cm, sizeof(fr_coord_msg_t));
751}
752
753/** Broadcast data from a coordinator to all workers
754 *
755 * @param coord Coordinator which is sending the data.
756 * @param cb_id Callback ID for the workers to run.
757 * @param dbuff Buffer containing data to send.
758 * @return
759 * - 0 on success
760 * - <0 on failure - indicating the number of sends which failed.
761 */
763{
764 uint32_t i;
765 int failed = 0;
766
767 for (i = 0; i < coord->max_workers; i++) {
768 if (!coord->coord_send_control[i - MIN_WORKER_ID]) continue;
769 if (fr_coord_to_worker_send(coord, i, cb_id, dbuff) < 0) failed++;
770 }
771
772 return 0 - failed;
773}
774
775/** Send data from a worker to a coordinator
776 *
777 * @param cw Worker side of coordinator sending the data.
778 * @param cb_id Callback ID for the coordinator to run.
779 * @param dbuff Buffer containing data to send.
780 * @return
781 * - 0 on success
782 * - < 0 on failure
783 */
785{
787 fr_coord_data_t *cd = NULL;
788
789 cm = (fr_coord_msg_t) {
791 };
792
794 if (!cd) return -1;
795
796 memcpy(cd->m.data, fr_dbuff_buff(dbuff), fr_dbuff_used(dbuff));
797 cd->coord_cb_id = cb_id;
800 return -1;
801 }
802
805}
806
807/** Insert instance specific pre-event callbacks
808 */
810{
811 fr_coord_t *coord;
813 fr_coord_cb_inst_t *cb_inst;
814 uint32_t i;
815
816 if (!coord_regs) return 0;
817
818 for (coord = fr_rb_iter_init_inorder(&coords, &iter);
819 coord != NULL;
820 coord = fr_rb_iter_next_inorder(&coords, &iter)) {
821 for (i = 0; i < coord->num_callbacks; i++) {
822 cb_inst = coord->cb_inst[i];
823 if (cb_inst && cb_inst->event_pre_cb &&
824 fr_event_pre_insert(el, cb_inst->event_pre_cb, cb_inst->inst_data) < 0) {
825 return -1;
826 }
827 }
828 }
829 return 0;
830}
831
832/** Insert instance specific post-event callbacks
833 */
835{
836 fr_coord_t *coord;
838 fr_coord_cb_inst_t *cb_inst;
839 uint32_t i;
840
841 if (!coord_regs) return 0;
842
843 for (coord = fr_rb_iter_init_inorder(&coords, &iter);
844 coord != NULL;
845 coord = fr_rb_iter_next_inorder(&coords, &iter)) {
846 for (i = 0; i < coord->num_callbacks; i++) {
847 cb_inst = coord->cb_inst[i];
848 if (cb_inst && cb_inst->event_post_cb &&
849 fr_event_post_insert(el, cb_inst->event_post_cb, cb_inst->inst_data) < 0) {
850 return -1;
851 }
852 }
853 }
854 return 0;
855}
log_entry msg
Definition acutest.h:794
bool fr_atomic_queue_pop(fr_atomic_queue_t *aq, void **p_data)
Pop a pointer from the atomic queue.
fr_atomic_queue_t * fr_atomic_queue_alloc(TALLOC_CTX *ctx, size_t size)
Create fixed-size atomic queue.
bool fr_atomic_queue_push(fr_atomic_queue_t *aq, void *data)
Push a pointer into the atomic queue.
Structure to hold the atomic queue.
#define RCSID(id)
Definition build.h:512
#define NDEBUG_UNUSED
Definition build.h:347
#define CMP(_a, _b)
Same as CMP_PREFER_SMALLER use when you don't really care about ordering, you just want an ordering.
Definition build.h:113
#define unlikely(_x)
Definition build.h:407
#define UNUSED
Definition build.h:336
#define FR_CONTROL_MAX_SIZE
Definition control.h:51
#define FR_CONTROL_MAX_MESSAGES
Definition control.h:50
static size_t num_workers
static fr_atomic_queue_t ** aq
fr_dlist_t entry
Entry in list of registrations.
Definition coord.c:87
static void coord_worker_attach(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
Callback run by a coordinator when a worker attaches.
Definition coord.c:270
fr_atomic_queue_t * worker_recv_aq
Atomic queue for coordinator -> worker messages.
Definition coord.c:78
uint32_t max_workers
Maximum number of workers which will connect to this coordinator.
Definition coord.c:100
fr_control_t ** coord_send_control
Control planes for coordinator -> worker messages.
Definition coord.c:63
#define FR_CONTROL_ID_COORD_WORKER_DETACH
Message sent from worker to detach from a coordinator.
Definition coord.c:37
static void coordinate_worker_ack(UNUSED void *ctx, NDEBUG_UNUSED void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
A worker got an ack from a coordinator in response to attach / detach.
Definition coord.c:643
fr_coord_reg_t * coord_reg
Coordinator registration details.
Definition coord.c:101
fr_coord_worker_cb_reg_t * worker_cb
Callbacks for coordinator -> worker messages.
Definition coord.c:89
int fr_coords_create(TALLOC_CTX *ctx, fr_event_list_t *el)
Start coordinators in single threaded mode.
Definition coord.c:593
fr_coord_reg_t * fr_coord_register(TALLOC_CTX *ctx, fr_coord_reg_ctx_t *reg_ctx)
Register a coordinator.
Definition coord.c:138
fr_control_t * worker_recv_control
Control plane to send messages to this worker.
Definition coord.c:110
static fr_dlist_head_t * coord_threads
Definition coord.c:44
static fr_coord_t * fr_coord_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coord_reg_t *coord_reg, bool single_thread, uint32_t max_workers)
Create a coordinator from its registration.
Definition coord.c:327
fr_coord_worker_t * fr_coord_attach(TALLOC_CTX *ctx, fr_event_list_t *el, fr_coord_reg_t *coord_reg)
Attach a worker to a coordinator.
Definition coord.c:663
fr_coord_reg_t * coord_reg
Coordinator registration details.
Definition coord.c:50
size_t worker_send_size
Initial size for worker -> coordinator ring buffer.
Definition coord.c:90
size_t coord_send_size
Initial size for coordinator -> worker ring buffer.
Definition coord.c:91
int32_t worker
Worker ID.
Definition coord.c:109
#define MIN_WORKER_ID
The minimum value we expect as worker id. -1 is the main thread.
Definition coord.c:41
int fr_coord_to_worker_send(fr_coord_t *coord, int32_t worker_id, uint32_t cb_id, fr_dbuff_t *dbuff)
Send generic data from a coordinator to a worker.
Definition coord.c:726
fr_ring_buffer_t ** coord_send_rb
Ring buffers for coordinator -> worker control messages.
Definition coord.c:62
uint32_t max_workers
Maximum number of workers we expect.
Definition coord.c:57
bool exiting
Is the server exiting.
Definition coord.c:116
fr_atomic_queue_t * worker_recv_aq
Atomic queue to send data to this worker.
Definition coord.c:111
static void coord_worker_data_recv(void *ctx, void const *data, size_t data_size, fr_time_t now)
Callback for a worker receiving data from a coordinator.
Definition coord.c:240
static void coord_worker_detach(void *ctx, void const *data, NDEBUG_UNUSED size_t data_size, UNUSED fr_time_t now)
Callback run by a coordinator when a worker detaches.
Definition coord.c:293
fr_coord_t * coord
Coordinator this worker is related to.
Definition coord.c:74
fr_coord_cb_inst_t ** cb_inst
Array of callback instance specific data.
Definition coord.c:55
void fr_coord_deregister(fr_coord_reg_t *coord_reg)
De-register a coordinator.
Definition coord.c:175
void fr_coords_destroy(void)
Clean up coordinators in single threaded mode.
Definition coord.c:575
static int8_t coord_cmp(void const *one, void const *two)
Compare coordinators by registration.
Definition coord.c:121
fr_control_t * coord_recv_control
Control plane for worker -> coordinator messages.
Definition coord.c:60
#define FR_CONTROL_ID_COORD_WORKER_ATTACH
Message sent from worker to attach to a coordinator.
Definition coord.c:36
int fr_worker_to_coord_send(fr_coord_worker_t *cw, uint32_t cb_id, fr_dbuff_t *dbuff)
Send data from a worker to a coordinator.
Definition coord.c:784
static fr_dlist_head_t * coord_regs
Definition coord.c:43
module_instance_t const * mi
Module instance which registered this coordinator.
Definition coord.c:92
fr_coord_cb_reg_t * callbacks
Array of callbacks for worker -> coordinator messages.
Definition coord.c:53
int fr_coord_post_event_insert(fr_event_list_t *el)
Insert instance specific post-event callbacks.
Definition coord.c:834
int fr_coord_pre_event_insert(fr_event_list_t *el)
Insert instance specific pre-event callbacks.
Definition coord.c:809
uint32_t num_callbacks
Number of callbacks registered.
Definition coord.c:80
fr_coord_cb_reg_t * coord_cb
Callbacks for worker -> coordinator messages.
Definition coord.c:88
fr_atomic_queue_t * coord_recv_aq
Atomic queue for worker -> coordinator.
Definition coord.c:61
static void * fr_coordinate_thread(void *arg)
Entry point for a coordinator thread.
Definition coord.c:478
static void coord_data_recv(void *ctx, void const *data, size_t data_size, fr_time_t now)
Callback for a coordinator receiving data from a worker.
Definition coord.c:208
fr_atomic_queue_t ** coord_send_aq
Atomic queues for coordinator -> worker data.
Definition coord.c:65
uint32_t num_callbacks
Number of callbacks defined.
Definition coord.c:54
bool exiting
Is this coordinator shutting down.
Definition coord.c:67
fr_rb_node_t node
Entry in the tree of coordinators.
Definition coord.c:52
fr_coord_worker_cb_reg_t * callbacks
Callbacks for coordinator -> worker messages.
Definition coord.c:79
fr_event_list_t * el
Coordinator event list.
Definition coord.c:51
int fr_coord_start(uint32_t num_workers, fr_sem_t *sem)
Start all registered coordinator threads in multi-threaded mode.
Definition coord.c:525
#define FR_CONTROL_ID_COORD_WORKER_ACK
Message sent to worker to acknowledge attach / detach.
Definition coord.c:38
#define FR_CONTROL_ID_COORD_DATA
Worker <-> coordinator message to pass data to a callback.
Definition coord.c:39
fr_message_set_t ** coord_send_ms
Message sets for coordinator -> worker data.
Definition coord.c:64
fr_thread_t thread
common thread information - must be first!
Definition coord.c:98
static void fr_coordinate(fr_coord_t *coord)
Run the event loop for a coordinator thread when in multi-threaded mode.
Definition coord.c:427
fr_control_t * worker_recv_control
Coordinator -> worker control plane.
Definition coord.c:77
bool single_thread
Are we in single thread mode.
Definition coord.c:68
char const * name
Name for debugging.
Definition coord.c:86
int fr_coord_to_worker_broadcast(fr_coord_t *coord, uint32_t cb_id, fr_dbuff_t *dbuff)
Broadcast data from a coordinator to all workers.
Definition coord.c:762
int fr_coord_detach(fr_coord_worker_t *cw, bool exiting)
Signal a coordinator that a worker wants to detach.
Definition coord.c:624
int32_t worker
Worker ID.
Definition coord.c:115
fr_ring_buffer_t * worker_send_rb
Ring buffer for worker -> coordinator control plane.
Definition coord.c:75
fr_coord_t * coord
The coordinator data structure.
Definition coord.c:102
static fr_rb_tree_t coords
Definition coord.c:45
uint32_t num_workers
How many workers are attached.
Definition coord.c:58
fr_message_set_t * worker_send_ms
Message set for worker -> coordinator messages.
Definition coord.c:76
static void fr_coord_destroy(fr_coord_t *coord)
Definition coord.c:415
fr_sem_t * sem
For inter-thread signaling.
Definition coord.c:103
A coordinator registration.
Definition coord.c:85
A coordinator which receives messages from workers.
Definition coord.c:49
Control plane message used for workers attaching / detaching to coordinators.
Definition coord.c:108
The worker end of worker <-> coordinator communication.
Definition coord.c:73
Scheduler specific information for coordinator threads.
Definition coord.c:97
fr_coord_cb_t callback
Definition coord.h:48
fr_coord_cb_reg_t * coord_cb
Callbacks for worker -> coordinator messages.
Definition coord.h:64
struct fr_coord_s fr_coord_t
Definition coord.h:37
void * uctx
Definition coord.h:51
char const * name
Name for this coordinator.
Definition coord.h:63
module_instance_t const * mi
Module instance registering this coordinator.
Definition coord.h:70
fr_coord_worker_cb_t callback
Definition coord.h:56
size_t coord_send_size
Initial ring buffer size for coordinator -> worker data.
Definition coord.h:68
struct fr_coord_reg_s fr_coord_reg_t
Definition coord.h:35
fr_coord_worker_cb_reg_t * worker_cb
Callbacks for coordinator -> worker messages.
Definition coord.h:65
size_t worker_send_size
Initial ring buffer size for worker -> coordinator data.
Definition coord.h:66
fr_coord_cb_inst_create_t inst_create
Definition coord.h:49
fr_coord_cb_inst_destroy_t inst_destroy
Definition coord.h:50
fr_coord_inst_event_cb_t event_cb
Event callback in multi thread mode.
Definition coord_priv.h:48
void * inst_data
Instance data.
Definition coord_priv.h:45
int32_t worker
Worker ID.
Definition coord_priv.h:33
uint32_t coord_cb_id
Callback ID for this message.
Definition coord_priv.h:40
fr_event_status_cb_t event_pre_cb
Pre-event callback in single thread mode.
Definition coord_priv.h:46
fr_event_post_cb_t event_post_cb
Post-event callback in single thread mode.
Definition coord_priv.h:47
fr_message_t m
Message containing data being sent.
Definition coord_priv.h:39
List / data message used between workers and coordinators.
Definition coord_priv.h:38
Generic control message used between workers and coordinators.
Definition coord_priv.h:32
#define fr_dbuff_used(_dbuff_or_marker)
Return the number of bytes remaining between the start of the dbuff or marker and the current positio...
Definition dbuff.h:775
#define fr_dbuff_init(_out, _start, _len_or_end)
Initialise an dbuff for encoding or decoding.
Definition dbuff.h:362
#define fr_dbuff_buff(_dbuff_or_marker)
Return the underlying buffer in a dbuff or one of marker.
Definition dbuff.h:890
#define MEM(x)
Definition debug.h:46
#define ERROR(fmt,...)
Definition dhcpclient.c:40
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:242
#define fr_dlist_foreach(_list_head, _type, _iter)
Iterate over the contents of a list.
Definition dlist.h:98
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition dlist.h:620
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:921
static int fr_dlist_insert_tail(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the tail of a list.
Definition dlist.h:360
Head of a doubly linked list.
Definition dlist.h:51
Entry in a doubly linked list.
Definition dlist.h:41
free(array)
talloc_free(hp)
fr_control_t * fr_control_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_atomic_queue_t *aq, size_t num_callbacks)
Create a control-plane signaling path.
Definition control.c:152
int fr_control_callback_add(fr_control_t **c, uint32_t id, void *ctx, fr_control_callback_t callback)
Register a callback for an ID.
Definition control.c:443
int fr_control_open(fr_control_t *c)
Open the control-plane signalling path.
Definition control.c:176
int fr_control_message_send(fr_control_t *c, fr_ring_buffer_t *rb, uint32_t id, void *data, size_t data_size)
Send a control-plane message.
Definition control.c:355
void fr_control_wait(fr_control_t *c)
Wait for a plane control to become readable.
Definition control.c:542
The control structure.
Definition control.c:76
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
#define DEBUG4(_fmt,...)
Definition log.h:267
void fr_event_service(fr_event_list_t *el)
Service any outstanding timer or file descriptor events.
Definition event.c:2177
int fr_event_corral(fr_event_list_t *el, fr_time_t now, bool wait)
Gather outstanding timer and file descriptor events.
Definition event.c:2045
#define fr_time()
Definition event.c:60
int fr_event_pre_insert(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Add a pre-event callback to the event list.
Definition event.c:1953
int fr_event_post_insert(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Add a post-event callback to the event list.
Definition event.c:2000
Stores all information relating to an event list.
Definition event.c:377
unsigned int uint32_t
unsigned char uint8_t
fr_message_set_t * fr_message_set_create(TALLOC_CTX *ctx, int num_messages, size_t message_size, size_t ring_buffer_size, bool unlimited_size)
Create a message set.
Definition message.c:127
int fr_message_done(fr_message_t *m)
Mark a message as done.
Definition message.c:195
fr_message_t * fr_message_alloc(fr_message_set_t *ms, fr_message_t *m, size_t actual_packet_size)
Allocate packet data for a message.
Definition message.c:1000
A Message set, composed of message headers and ring buffer data.
Definition message.c:94
uint8_t * data
pointer to the data in the ring buffer
Definition message.h:49
size_t data_size
size of the data in the ring buffer
Definition message.h:50
#define MODULE_CTX(_mi, _thread, _env_data, _rctx)
Wrapper to create a module_ctx_t as a compound literal.
Definition module_ctx.h:128
#define fr_assert(_expr)
Definition rad_assert.h:37
#define DEBUG2(fmt,...)
#define INFO(fmt,...)
Definition radict.c:63
uint32_t fr_rb_num_elements(fr_rb_tree_t *tree)
Return how many nodes there are in a tree.
Definition rb.c:781
void * fr_rb_iter_init_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Initialise an in-order iterator.
Definition rb.c:824
void fr_rb_iter_delete_inorder(fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Remove the current node from the tree.
Definition rb.c:899
void * fr_rb_find(fr_rb_tree_t const *tree, void const *data)
Find an element in the tree, returning the data, not the node.
Definition rb.c:577
bool fr_rb_insert(fr_rb_tree_t *tree, void const *data)
Insert data into a tree.
Definition rb.c:626
void * fr_rb_iter_next_inorder(UNUSED fr_rb_tree_t *tree, fr_rb_iter_inorder_t *iter)
Return the next node.
Definition rb.c:850
struct fr_rb_tree_s fr_rb_tree_t
Definition rb.h:51
#define fr_rb_inline_talloc_init(_tree, _type, _field, _data_cmp, _data_free)
Initialises a red black that verifies elements are of a specific talloc type.
Definition rb.h:155
uint32_t num_elements
How many elements are inside the tree.
Definition rb.h:95
Iterator structure for in-order traversal of an rbtree.
Definition rb.h:319
The main red black tree structure.
Definition rb.h:71
fr_ring_buffer_t * fr_ring_buffer_create(TALLOC_CTX *ctx, size_t size)
Create a ring buffer.
Definition ring_buffer.c:63
static _Thread_local int worker_id
Internal ID of the current worker thread.
Definition schedule.c:104
int fr_schedule_worker_id(void)
Return the worker id for the current thread.
Definition schedule.c:110
sem_t fr_sem_t
Definition semaphore.h:53
void * data
Thread specific instance data.
Definition module.h:374
static module_thread_instance_t * module_thread(module_instance_t const *mi)
Retrieve module/thread specific instance for a module.
Definition module.h:503
Module instance data.
Definition module.h:287
static const uchar sc[16]
Definition smbdes.c:115
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
Definition snprintf.c:689
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
void fr_thread_start(fr_thread_t *thread, fr_sem_t *sem)
Signal the parent that we're done.
Definition thread.c:203
int fr_thread_wait_list(fr_sem_t *sem, fr_dlist_head_t *head)
Wait for multiple threads to signal readiness via a semaphore.
Definition thread.c:79
int fr_thread_create(pthread_t *thread, fr_thread_entry_t func, void *arg)
Create a joinable thread.
Definition thread.c:46
int fr_thread_setup(fr_thread_t *out, char const *name)
Common setup for child threads: block signals, allocate a talloc context, and create an event list.
Definition thread.c:112
int fr_thread_instantiate(TALLOC_CTX *ctx, fr_event_list_t *el)
Instantiate thread-specific data for modules, virtual servers, xlats, unlang, and TLS.
Definition thread.c:165
void fr_thread_detach(void)
Detach thread-specific data for modules, virtual servers, xlats.
Definition thread.c:190
void fr_thread_exit(fr_thread_t *thread, fr_thread_status_t status, fr_sem_t *sem)
Signal the parent that we're done.
Definition thread.c:218
fr_thread_status_t
Track the child thread status.
Definition thread.h:38
@ FR_THREAD_EXITED
exited, and in the exited queue
Definition thread.h:42
@ FR_THREAD_FAIL
failed, and in the exited queue
Definition thread.h:43
"server local" time.
Definition time.h:69
static fr_event_list_t * el
#define fr_strerror_const(_msg)
Definition strerror.h:223
static fr_slen_t data
Definition value.h:1340