The FreeRADIUS server $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
Loading...
Searching...
No Matches
schedule.c
Go to the documentation of this file.
1/*
2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15 */
16
17/**
18 * $Id: a9e807032cb1c4415ba5b1a8f13f87cd4098a537 $
19 *
20 * @brief Network / worker thread scheduling
21 * @file io/schedule.c
22 *
23 * @copyright 2016 Alan DeKok (aland@freeradius.org)
24 */
25RCSID("$Id: a9e807032cb1c4415ba5b1a8f13f87cd4098a537 $")
26
27#define LOG_DST sc->log
28
29#include <freeradius-devel/autoconf.h>
30
31#include <freeradius-devel/io/schedule.h>
32#include <freeradius-devel/util/dlist.h>
33#include <freeradius-devel/util/rb.h>
34#include <freeradius-devel/util/syserror.h>
35#include <freeradius-devel/server/trigger.h>
36
37#include <pthread.h>
38
39/*
40 * Other OS's have sem_init, OS X doesn't.
41 */
42#ifdef HAVE_SEMAPHORE_H
43#include <semaphore.h>
44#endif
45
46#define SEMAPHORE_LOCKED (0)
47
48#ifdef __APPLE__
49#include <mach/task.h>
50#include <mach/mach_init.h>
51#include <mach/semaphore.h>
52
53#undef sem_t
54#define sem_t semaphore_t
55#undef sem_init
56#define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
57#undef sem_wait
58#define sem_wait(s) semaphore_wait(*s)
59#undef sem_post
60#define sem_post(s) semaphore_signal(*s)
61#undef sem_destroy
62#define sem_destroy(s) semaphore_destroy(mach_task_self(),*s)
63#endif /* __APPLE__ */
64
65#define SEM_WAIT_INTR(_x) do {if (sem_wait(_x) == 0) break;} while (errno == EINTR)
66
67/**
68 * Track the child thread status.
69 */
71 FR_CHILD_FREE = 0, //!< child is free
72 FR_CHILD_INITIALIZING, //!< initialized, but not running
73 FR_CHILD_RUNNING, //!< running, and in the running queue
74 FR_CHILD_EXITED, //!< exited, and in the exited queue
75 FR_CHILD_FAIL //!< failed, and in the exited queue
77
78/** Scheduler specific information for worker threads
79 *
80 * Wraps a fr_worker_t, tracking additional information that
81 * the scheduler uses.
82 */
83typedef struct {
84 TALLOC_CTX *ctx; //!< our allocation ctx
85 fr_event_list_t *el; //!< our event list
86 pthread_t pthread_id; //!< the thread of this worker
87
88 unsigned int id; //!< a unique ID
89 int uses; //!< how many network threads are using it
90 fr_time_t cpu_time; //!< how much CPU time this worker has used
91
92 fr_dlist_t entry; //!< our entry into the linked list of workers
93
94 fr_schedule_t *sc; //!< the scheduler we are running under
95
96 fr_schedule_child_status_t status; //!< status of the worker
97 fr_worker_t *worker; //!< the worker data structure
99
100/** Scheduler specific information for network threads
101 *
102 * Wraps a fr_network_t, tracking additional information that
103 * the scheduler uses.
104 */
105typedef struct {
106 TALLOC_CTX *ctx; //!< our allocation ctx
107 pthread_t pthread_id; //!< the thread of this network
108
109 unsigned int id; //!< a unique ID
110
111 fr_dlist_t entry; //!< our entry into the linked list of networks
112
113 fr_schedule_t *sc; //!< the scheduler we are running under
114
115 fr_schedule_child_status_t status; //!< status of the worker
116 fr_network_t *nr; //!< the receive data structure
117
118 fr_timer_t *ev; //!< timer for stats_interval
120
121
122/**
123 * The scheduler
124 */
126 bool running; //!< is the scheduler running?
127
128 CONF_SECTION *cs; //!< thread pool configuration section
129 fr_event_list_t *el; //!< event list for single-threaded mode.
130
131 fr_log_t *log; //!< log destination
132 fr_log_lvl_t lvl; //!< log level
133
134 fr_schedule_config_t *config; //!< configuration
135
136 unsigned int num_workers_exited; //!< number of exited workers
137
138 sem_t worker_sem; //!< for inter-thread signaling
139 sem_t network_sem; //!< for inter-thread signaling
140
143
144 fr_dlist_head_t workers; //!< list of workers
145 fr_dlist_head_t networks; //!< list of networks
146
147 fr_network_t *single_network; //!< for single-threaded mode
148 fr_worker_t *single_worker; //!< for single-threaded mode
149};
150
151static _Thread_local int worker_id; //!< Internal ID of the current worker thread.
152
153/** Return the worker id for the current thread
154 *
155 * @return worker ID
156 */
158{
159 return worker_id;
160}
161
162/** Entry point for worker threads
163 *
164 * @param[in] arg the fr_schedule_worker_t
165 * @return NULL
166 */
167static void *fr_schedule_worker_thread(void *arg)
168{
169 TALLOC_CTX *ctx;
170 fr_schedule_worker_t *sw = talloc_get_type_abort(arg, fr_schedule_worker_t);
171 fr_schedule_t *sc = sw->sc;
174 char worker_name[32];
175
176#ifndef __APPLE__
177 /*
178 * This ifdef is because macOS doesn't use pthread_signmask in its
179 * setcontext function, and seems to apply the signal mask of the thread
180 * to the entire process when setcontext is called.
181 *
182 * * frame #0: 0x00000001934118b0 libsystem_kernel.dylib`sigprocmask
183 * frame #1: 0x0000000193481f3c libsystem_platform.dylib`setcontext + 44
184 * frame #2: 0x0000000100f27298 libcrypto.3.dylib`async_fibre_swapcontext + 52
185 * frame #3: 0x0000000100f274a0 libcrypto.3.dylib`ASYNC_start_job + 496
186 * frame #4: 0x0000000100b17884 libssl.3.dylib`ssl_start_async_job + 116
187 * frame #5: 0x0000000100b17804 libssl.3.dylib`ssl_read_internal + 356
188 * frame #6: 0x0000000100b17a0c libssl.3.dylib`SSL_read + 28
189 * frame #7: 0x00000001004f5b94 libfreeradius-tls.dylib`tls_session_async_handshake_cont(p_result=0x0000000112815c7c, priority=0x0000000112815edc, request=0x0000000112815a80, uctx=0x0000000139160060) at session.c:1366:26
190 */
191 sigset_t sigset;
192
193 sigfillset(&sigset);
194
195 /*
196 * Ensure workers aren't interrupted by signals.
197 * The main thread, and main event loop are mostly
198 * idle, so they can handle signals.
199 */
200 pthread_sigmask(SIG_BLOCK, &sigset, NULL);
201#endif
202
203 worker_id = sw->id; /* Store the current worker ID */
204
205 snprintf(worker_name, sizeof(worker_name), "Worker %d", sw->id);
206
207 sw->ctx = ctx = talloc_init("%s", worker_name);
208 if (!ctx) {
209 ERROR("%s - Failed allocating memory", worker_name);
210 goto fail;
211 }
212
213 INFO("%s - Starting", worker_name);
214
215 sw->el = fr_event_list_alloc(ctx, NULL, NULL);
216 if (!sw->el) {
217 PERROR("%s - Failed creating event list", worker_name);
218 goto fail;
219 }
220
221
222 sw->worker = fr_worker_alloc(ctx, sw->el, worker_name, sc->log, sc->lvl, &sc->config->worker);
223 if (!sw->worker) {
224 PERROR("%s - Failed creating worker", worker_name);
225 goto fail;
226 }
227
228 /*
229 * @todo make this a registry
230 */
231 if (sc->worker_thread_instantiate) {
232 CONF_SECTION *cs;
233 char section_name[32];
234
235 snprintf(section_name, sizeof(section_name), "%u", sw->id);
236
237 cs = cf_section_find(sc->cs, "worker", section_name);
238 if (!cs) cs = cf_section_find(sc->cs, "worker", NULL);
239
240 if (sc->worker_thread_instantiate(sw->ctx, sw->el, cs) < 0) {
241 PERROR("%s - Worker thread instantiation failed", worker_name);
242 goto fail;
243 }
244 }
245
247
248 /*
249 * Add this worker to all network threads.
250 */
251 for (sn = fr_dlist_head(&sc->networks);
252 sn != NULL;
253 sn = fr_dlist_next(&sc->networks, sn)) {
254 if (unlikely(fr_network_worker_add(sn->nr, sw->worker) < 0)) {
255 PERROR("%s - Failed adding worker to network %u", worker_name, sn->id);
256 goto fail; /* FIXME - Should maybe try to undo partial adds? */
257 }
258 }
259
260 DEBUG3("%s - Started", worker_name);
261
262 /*
263 * Tell the originator that the thread has started.
264 */
265 sem_post(&sc->worker_sem);
266
267 /*
268 * Do all of the work.
269 */
270 fr_worker(sw->worker);
271
272 status = FR_CHILD_EXITED;
273
274fail:
275 sw->status = status;
276
277 if (sw->worker) {
279 sw->worker = NULL;
280 }
281
282 INFO("%s - Exiting", worker_name);
283
284 if (sc->worker_thread_detach) sc->worker_thread_detach(NULL); /* Fixme once we figure out what uctx should be */
285
286 /*
287 * Not looping at this point, but may catch timer/fd
288 * insertions being done after the thread should have
289 * exited.
290 */
291 if (sw->el) fr_event_loop_exit(sw->el, 1);
292
293 /*
294 * Tell the scheduler we're done.
295 */
296 sem_post(&sc->worker_sem);
297
298 talloc_free(ctx);
299
300 return NULL;
301}
302
303
304static void stats_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx)
305{
306 fr_schedule_network_t *sn = talloc_get_type_abort(uctx, fr_schedule_network_t);
307
308 fr_network_stats_log(sn->nr, sn->sc->log);
309
310 (void) fr_timer_at(sn, tl, &sn->ev, fr_time_add(now, sn->sc->config->stats_interval), false, stats_timer, sn);
311}
312
313/** Initialize and run the network thread.
314 *
315 * @param[in] arg the fr_schedule_network_t
316 * @return NULL
317 */
318static void *fr_schedule_network_thread(void *arg)
319{
320 TALLOC_CTX *ctx;
321 fr_schedule_network_t *sn = talloc_get_type_abort(arg, fr_schedule_network_t);
322 fr_schedule_t *sc = sn->sc;
325 char network_name[32];
326
327#ifndef __APPLE__
328 /*
329 * This ifdef is because macOS doesn't use pthread_signmask in its
330 * setcontext function, and seems to apply the signal mask of the thread
331 * to the entire process when setcontext is called.
332 *
333 * * frame #0: 0x00000001934118b0 libsystem_kernel.dylib`sigprocmask
334 * frame #1: 0x0000000193481f3c libsystem_platform.dylib`setcontext + 44
335 * frame #2: 0x0000000100f27298 libcrypto.3.dylib`async_fibre_swapcontext + 52
336 * frame #3: 0x0000000100f274a0 libcrypto.3.dylib`ASYNC_start_job + 496
337 * frame #4: 0x0000000100b17884 libssl.3.dylib`ssl_start_async_job + 116
338 * frame #5: 0x0000000100b17804 libssl.3.dylib`ssl_read_internal + 356
339 * frame #6: 0x0000000100b17a0c libssl.3.dylib`SSL_read + 28
340 * frame #7: 0x00000001004f5b94 libfreeradius-tls.dylib`tls_session_async_handshake_cont(p_result=0x0000000112815c7c, priority=0x0000000112815edc, request=0x0000000112815a80, uctx=0x0000000139160060) at session.c:1366:26
341 */
342 sigset_t sigset;
343
344 sigfillset(&sigset);
345
346 /*
347 * Ensure workers aren't interrupted by signals.
348 * The main thread, and main event loop are mostly
349 * idle, so they can handle signals.
350 */
351 pthread_sigmask(SIG_BLOCK, &sigset, NULL);
352#endif
353
354 snprintf(network_name, sizeof(network_name), "Network %d", sn->id);
355
356 INFO("%s - Starting", network_name);
357
358 sn->ctx = ctx = talloc_init("%s", network_name);
359 if (!ctx) {
360 ERROR("%s - Failed allocating memory", network_name);
361 goto fail;
362 }
363
364 el = fr_event_list_alloc(ctx, NULL, NULL);
365 if (!el) {
366 PERROR("%s - Failed creating event list", network_name);
367 goto fail;
368 }
369
370 sn->nr = fr_network_create(ctx, el, network_name, sc->log, sc->lvl, &sc->config->network);
371 if (!sn->nr) {
372 PERROR("%s - Failed creating network", network_name);
373 goto fail;
374 }
375
377
378 /*
379 * Tell the originator that the thread has started.
380 */
381 sem_post(&sc->network_sem);
382
383 DEBUG3("%s - Started", network_name);
384
385 /*
386 * Print out statistics for this network IO handler.
387 */
388 if (fr_time_delta_ispos(sc->config->stats_interval)) {
389 (void) fr_timer_in(sn, el->tl, &sn->ev, sn->sc->config->stats_interval, false, stats_timer, sn);
390 }
391 /*
392 * Call the main event processing loop of the network
393 * thread Will not return until the worker is about
394 * to exit.
395 */
396 fr_network(sn->nr);
397
398 status = FR_CHILD_EXITED;
399
400fail:
401 sn->status = status;
402
403 INFO("%s - Exiting", network_name);
404
405 /*
406 * Tell the scheduler we're done.
407 */
408 sem_post(&sc->network_sem);
409
410 talloc_free(ctx);
411
412 return NULL;
413}
414
415/** Creates a new thread using our standard set of options
416 *
417 * New threads are:
418 * - Joinable, i.e. you can call pthread_join on them to confirm they've exited
419 * - Immune to catchable signals.
420 *
421 * @param[out] thread handled that was created by pthread_create.
422 * @param[in] func entry point for the thread.
423 * @param[in] arg Argument to pass to func.
424 * @return
425 * - 0 on success.
426 * - -1 on failure.
427 */
428int fr_schedule_pthread_create(pthread_t *thread, void *(*func)(void *), void *arg)
429{
430 pthread_attr_t attr;
431 int ret;
432
433 /*
434 * Set the thread to wait around after it's exited
435 * so it can be joined. This is more of a useful
436 * mechanism for the parent to determine if all
437 * the threads have exited so it can continue with
438 * a graceful shutdown.
439 */
440 pthread_attr_init(&attr);
441 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
442
443 ret = pthread_create(thread, &attr, func, arg);
444 if (ret != 0) {
445 pthread_attr_destroy(&attr);
446 fr_strerror_printf("Failed creating thread: %s", fr_syserror(ret));
447 return -1;
448 }
449 pthread_attr_destroy(&attr);
450
451 return 0;
452}
453
454/** Create a scheduler and spawn the child threads.
455 *
456 * @param[in] ctx talloc context.
457 * @param[in] el event list, only for single-threaded mode.
458 * @param[in] logger destination for all logging messages.
459 * @param[in] lvl log level.
460 * @param[in] worker_thread_instantiate callback for new worker threads.
461 * @param[in] worker_thread_detach callback to destroy resources
462 * allocated by worker_thread_instantiate.
463 * @param[in] config configuration for the scheduler
464 * @return
465 * - NULL on error
466 * - fr_schedule_t new scheduler
467 */
469 fr_log_t *logger, fr_log_lvl_t lvl,
470 fr_schedule_thread_instantiate_t worker_thread_instantiate,
471 fr_schedule_thread_detach_t worker_thread_detach,
473{
474 unsigned int i;
475 fr_schedule_worker_t *sw, *next_sw;
476 fr_schedule_network_t *sn, *next_sn;
478
479 sc = talloc_zero(ctx, fr_schedule_t);
480 if (!sc) {
481 fr_strerror_const("Failed allocating memory");
482 return NULL;
483 }
484
485 /*
486 * Parse any scheduler-specific configuration.
487 */
488 if (!config) {
489 MEM(sc->config = talloc_zero(sc, fr_schedule_config_t));
490 sc->config->max_networks = 1;
491 sc->config->max_workers = 4;
492 } else {
493 sc->config = config;
494
495 if (sc->config->max_networks < 1) sc->config->max_networks = 1;
496 if (sc->config->max_networks > 64) sc->config->max_networks = 64;
497 if (sc->config->max_workers < 1) sc->config->max_workers = 1;
498 if (sc->config->max_workers > 64) sc->config->max_workers = 64;
499 }
500
501 sc->el = el;
502 sc->log = logger;
503 sc->lvl = lvl;
504 sc->cs = sc->config->cs;
505
506 sc->worker_thread_instantiate = worker_thread_instantiate;
507 sc->worker_thread_detach = worker_thread_detach;
508 sc->running = true;
509
510 /*
511 * If we're single-threaded, create network / worker, and insert them into the event loop.
512 */
513 if (el) {
514 sc->single_network = fr_network_create(sc, el, "Network", sc->log, sc->lvl, &sc->config->network);
515 if (!sc->single_network) {
516 PERROR("Failed creating network");
517 pre_instantiate_st_fail:
519 return NULL;
520 }
521
522 sc->single_worker = fr_worker_alloc(sc, el, "Worker", sc->log, sc->lvl, &sc->config->worker);
523 if (!sc->single_worker) {
524 PERROR("Failed creating worker");
525 if (unlikely(fr_network_destroy(sc->single_network) < 0)) {
526 PERROR("Failed destroying network");
527 }
528 goto pre_instantiate_st_fail;
529 }
530
531 /*
532 * Parent thread-specific data from the single_worker
533 */
534 if (sc->worker_thread_instantiate) {
535 CONF_SECTION *subcs;
536
537 subcs = cf_section_find(sc->cs, "worker", "0");
538 if (!subcs) subcs = cf_section_find(sc->cs, "worker", NULL);
539
540 if (sc->worker_thread_instantiate(sc->single_worker, el, subcs) < 0) {
541 PERROR("Worker thread instantiation failed");
542 destroy_both:
543 if (unlikely(fr_network_destroy(sc->single_network) < 0)) {
544 PERROR("Failed destroying network");
545 }
546 fr_worker_destroy(sc->single_worker);
547 goto pre_instantiate_st_fail;
548 }
549 }
550
551 if (fr_command_register_hook(NULL, "0", sc->single_worker, cmd_worker_table) < 0) {
552 PERROR("Failed adding worker commands");
553 st_fail:
554 if (sc->worker_thread_detach) sc->worker_thread_detach(NULL);
555 goto destroy_both;
556 }
557
558 if (fr_command_register_hook(NULL, "0", sc->single_network, cmd_network_table) < 0) {
559 PERROR("Failed adding network commands");
560 goto st_fail;
561 }
562
563 /*
564 * Register the worker with the network, so
565 * things like fr_network_send_request() work.
566 */
567 fr_network_worker_add_self(sc->single_network, sc->single_worker);
568 DEBUG("Scheduler created in single-threaded mode");
569
570 if (fr_event_pre_insert(el, fr_worker_pre_event, sc->single_worker) < 0) {
571 fr_strerror_const("Failed adding pre-check to event list");
572 goto st_fail;
573 }
574
575 /*
576 * Add the event which processes request_t packets.
577 */
578 if (fr_event_post_insert(el, fr_worker_post_event, sc->single_worker) < 0) {
579 fr_strerror_const("Failed inserting post-processing event");
580 goto st_fail;
581 }
582
583 return sc;
584 }
585
586 /*
587 * Create the lists which hold the workers and networks.
588 */
589 fr_dlist_init(&sc->workers, fr_schedule_worker_t, entry);
590 fr_dlist_init(&sc->networks, fr_schedule_network_t, entry);
591
592 memset(&sc->network_sem, 0, sizeof(sc->network_sem));
593 if (sem_init(&sc->network_sem, 0, SEMAPHORE_LOCKED) != 0) {
594 ERROR("Failed creating semaphore: %s", fr_syserror(errno));
596 return NULL;
597 }
598
599 memset(&sc->worker_sem, 0, sizeof(sc->worker_sem));
600 if (sem_init(&sc->worker_sem, 0, SEMAPHORE_LOCKED) != 0) {
601 ERROR("Failed creating semaphore: %s", fr_syserror(errno));
602 sem_destroy(&sc->network_sem);
604 return NULL;
605 }
606
607 /*
608 * Create the network threads first.
609 */
610 for (i = 0; i < sc->config->max_networks; i++) {
611 DEBUG3("Creating %u/%u networks", i + 1, sc->config->max_networks);
612
613 /*
614 * Create a worker "glue" structure
615 */
616 sn = talloc_zero(sc, fr_schedule_network_t);
617 if (!sn) {
618 ERROR("Network %u - Failed allocating memory", i);
619 break;
620 }
621
622 sn->id = i;
623 sn->sc = sc;
625
627 talloc_free(sn);
628 PERROR("Failed creating network %u", i);
629 break;
630 }
631
632 fr_dlist_insert_head(&sc->networks, sn);
633 }
634
635 /*
636 * Wait for all of the networks to signal us that either
637 * they've started, OR there's been a problem and they
638 * can't start.
639 */
640 for (i = 0; i < (unsigned int)fr_dlist_num_elements(&sc->networks); i++) {
641 DEBUG3("Waiting for semaphore from network %u/%u",
642 i + 1, (unsigned int)fr_dlist_num_elements(&sc->networks));
643 SEM_WAIT_INTR(&sc->network_sem);
644 }
645
646 /*
647 * See if all of the networks have started.
648 */
649 for (sn = fr_dlist_head(&sc->networks);
650 sn != NULL;
651 sn = next_sn) {
652 next_sn = fr_dlist_next(&sc->networks, sn);
653
654 if (sn->status != FR_CHILD_RUNNING) {
655 fr_dlist_remove(&sc->networks, sn);
656 continue;
657 }
658 }
659
660 /*
661 * Failed to start some networks, refuse to do anything!
662 */
663 if ((unsigned int)fr_dlist_num_elements(&sc->networks) < sc->config->max_networks) {
665 return NULL;
666 }
667
668 /*
669 * Create all of the workers.
670 */
671 for (i = 0; i < sc->config->max_workers; i++) {
672 DEBUG3("Creating %u/%u workers", i + 1, sc->config->max_workers);
673
674 /*
675 * Create a worker "glue" structure
676 */
677 sw = talloc_zero(sc, fr_schedule_worker_t);
678 if (!sw) {
679 ERROR("Worker %u - Failed allocating memory", i);
680 break;
681 }
682
683 sw->id = i;
684 sw->sc = sc;
686
688 talloc_free(sw);
689 PERROR("Failed creating worker %u", i);
690 break;
691 }
692
693 fr_dlist_insert_head(&sc->workers, sw);
694 }
695
696 /*
697 * Wait for all of the workers to signal us that either
698 * they've started, OR there's been a problem and they
699 * can't start.
700 */
701 for (i = 0; i < (unsigned int)fr_dlist_num_elements(&sc->workers); i++) {
702 DEBUG3("Waiting for semaphore from worker %u/%u",
703 i + 1, (unsigned int)fr_dlist_num_elements(&sc->workers));
704 SEM_WAIT_INTR(&sc->worker_sem);
705 }
706
707 /*
708 * See if all of the workers have started.
709 */
710 for (sw = fr_dlist_head(&sc->workers);
711 sw != NULL;
712 sw = next_sw) {
713
714 next_sw = fr_dlist_next(&sc->workers, sw);
715
716 if (sw->status != FR_CHILD_RUNNING) {
717 fr_dlist_remove(&sc->workers, sw);
718 continue;
719 }
720 }
721
722 /*
723 * Failed to start some workers, refuse to do anything!
724 */
725 if ((unsigned int)fr_dlist_num_elements(&sc->workers) < sc->config->max_workers) {
727 return NULL;
728 }
729
730 for (sw = fr_dlist_head(&sc->workers), i = 0;
731 sw != NULL;
732 sw = next_sw, i++) {
733 char buffer[32];
734
735 next_sw = fr_dlist_next(&sc->workers, sw);
736
737 snprintf(buffer, sizeof(buffer), "%d", i);
739 PERROR("Failed adding worker commands");
740 mt_fail:
742 return NULL;
743 }
744 }
745
746 for (sn = fr_dlist_head(&sc->networks), i = 0;
747 sn != NULL;
748 sn = next_sn, i++) {
749 char buffer[32];
750
751 next_sn = fr_dlist_next(&sc->networks, sn);
752
753 snprintf(buffer, sizeof(buffer), "%d", i);
755 PERROR("Failed adding network commands");
756 goto mt_fail;
757 }
758 }
759
760 if (sc) INFO("Scheduler created successfully with %u networks and %u workers",
761 sc->config->max_networks, (unsigned int)fr_dlist_num_elements(&sc->workers));
762
763 return sc;
764}
765
766/** Destroy a scheduler, and tell its child threads to exit.
767 *
768 * @note This may be called with no worker or network threads in the case of a
769 * instantiation error. This function _should_ deal with that condition
770 * gracefully.
771 *
772 * @param[in] sc_to_free the scheduler
773 * @return
774 * - <0 on error
775 * - 0 on success
776 */
778{
779 fr_schedule_t *sc = *sc_to_free;
780 unsigned int i;
783 int ret;
784
785 if (!sc) return 0;
786
787 sc->running = false;
788
789 /*
790 * Single threaded mode: kill the only network / worker we have.
791 */
792 if (sc->el) {
793 /*
794 * Destroy the network side first. It tells the
795 * workers to close.
796 */
797 if (unlikely(fr_network_destroy(sc->single_network) < 0)) {
798 ERROR("Failed destroying network");
799 }
800 fr_worker_destroy(sc->single_worker);
801 goto done;
802 }
803
804 /*
805 * Signal each network thread to exit.
806 */
807 for (sn = fr_dlist_head(&sc->networks);
808 sn != NULL;
809 sn = fr_dlist_next(&sc->networks, sn)) {
810 if (fr_network_exit(sn->nr) < 0) {
811 PERROR("Failed signaling network %i to exit", sn->id);
812 }
813 }
814
815 /*
816 * If the network threads are running, tell them to exit,
817 * and wait for them to do so. Each network thread tells
818 * all of its worker threads that it's exiting. It then
819 * closes the channels. When the workers see that there
820 * are no input channels, they exit, too.
821 */
822 for (i = 0; i < (unsigned int)fr_dlist_num_elements(&sc->networks); i++) {
823 DEBUG2("Scheduler - Waiting for semaphore indicating network exit %u/%u", i + 1,
824 (unsigned int)fr_dlist_num_elements(&sc->networks));
825 SEM_WAIT_INTR(&sc->network_sem);
826 }
827 DEBUG2("Scheduler - All networks indicated exit complete");
828
829 while ((sn = fr_dlist_head(&sc->networks)) != NULL) {
830 fr_dlist_remove(&sc->networks, sn);
831
832 /*
833 * Ensure that the thread has exited before
834 * cleaning up the context.
835 *
836 * This also ensures that the child threads have
837 * exited before the main thread cleans up the
838 * module instances.
839 */
840 if ((ret = pthread_join(sn->pthread_id, NULL)) != 0) {
841 ERROR("Failed joining network %i: %s", sn->id, fr_syserror(ret));
842 } else {
843 DEBUG2("Network %i joined (cleaned up)", sn->id);
844 }
845 }
846
847 /*
848 * Wait for all worker threads to finish. THEN clean up
849 * modules. Otherwise, the modules will be removed from
850 * underneath the workers!
851 */
852 for (i = 0; i < (unsigned int)fr_dlist_num_elements(&sc->workers); i++) {
853 DEBUG2("Scheduler - Waiting for semaphore indicating worker exit %u/%u", i + 1,
854 (unsigned int)fr_dlist_num_elements(&sc->workers));
855 SEM_WAIT_INTR(&sc->worker_sem);
856 }
857 DEBUG2("Scheduler - All workers indicated exit complete");
858
859 /*
860 * Clean up the exited workers.
861 */
862 while ((sw = fr_dlist_head(&sc->workers)) != NULL) {
863 fr_dlist_remove(&sc->workers, sw);
864
865 /*
866 * Ensure that the thread has exited before
867 * cleaning up the context.
868 *
869 * This also ensures that the child threads have
870 * exited before the main thread cleans up the
871 * module instances.
872 */
873 if ((ret = pthread_join(sw->pthread_id, NULL)) != 0) {
874 ERROR("Failed joining worker %i: %s", sw->id, fr_syserror(ret));
875 } else {
876 DEBUG2("Worker %i joined (cleaned up)", sw->id);
877 }
878 }
879
880 sem_destroy(&sc->network_sem);
881 sem_destroy(&sc->worker_sem);
882done:
883 /*
884 * Now that all of the workers are done, we can return to
885 * the caller, and have it dlclose() the modules.
886 */
888 *sc_to_free = NULL;
889
890 return 0;
891}
892
893/** Add a fr_listen_t to a scheduler.
894 *
895 * @param[in] sc the scheduler
896 * @param[in] li the ctx and callbacks for the transport.
897 * @return
898 * - NULL on error
899 * - the fr_network_t that the socket was added to.
900 */
902{
903 fr_network_t *nr;
904
905 (void) talloc_get_type_abort(sc, fr_schedule_t);
906
907 if (sc->el) {
908 nr = sc->single_network;
909 } else {
911
912 /*
913 * @todo - round robin it among the listeners?
914 * or maybe add it to the same parent thread?
915 */
916 sn = fr_dlist_head(&sc->networks);
917 nr = sn->nr;
918 }
919
920 if (fr_network_listen_add(nr, li) < 0) return NULL;
921
922 return nr;
923}
924
925/** Add a directory NOTE_EXTEND to a scheduler.
926 *
927 * @param[in] sc the scheduler
928 * @param[in] li the ctx and callbacks for the transport.
929 * @return
930 * - NULL on error
931 * - the fr_network_t that the socket was added to.
932 */
934{
935 fr_network_t *nr;
936
937 (void) talloc_get_type_abort(sc, fr_schedule_t);
938
939 if (sc->el) {
940 nr = sc->single_network;
941 } else {
943
944 /*
945 * @todo - round robin it among the listeners?
946 * or maybe add it to the same parent thread?
947 */
948 sn = fr_dlist_head(&sc->networks);
949 nr = sn->nr;
950 }
951
952 if (fr_network_directory_add(nr, li) < 0) return NULL;
953
954 return nr;
955}
static int const char char buffer[256]
Definition acutest.h:578
#define RCSID(id)
Definition build.h:487
#define unlikely(_x)
Definition build.h:383
A section grouping multiple CONF_PAIR.
Definition cf_priv.h:101
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
Definition cf_util.c:1027
fr_command_register_hook_t fr_command_register_hook
Definition command.c:42
#define MEM(x)
Definition debug.h:36
#define ERROR(fmt,...)
Definition dhcpclient.c:41
#define DEBUG(fmt,...)
Definition dhcpclient.c:39
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition dlist.h:242
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition dlist.h:468
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition dlist.h:620
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition dlist.h:921
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
Definition dlist.h:320
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Definition dlist.h:537
Head of a doubly linked list.
Definition dlist.h:51
Entry in a doubly linked list.
Definition dlist.h:41
talloc_free(hp)
fr_cmd_table_t cmd_network_table[]
Definition network.c:2138
int fr_network_listen_add(fr_network_t *nr, fr_listen_t *li)
Add a fr_listen_t to a network.
Definition network.c:236
int fr_network_worker_add(fr_network_t *nr, fr_worker_t *worker)
Add a worker to a network in a different thread.
Definition network.c:305
int fr_network_destroy(fr_network_t *nr)
Stop a network thread in an orderly way.
Definition network.c:1726
int fr_network_directory_add(fr_network_t *nr, fr_listen_t *li)
Add a "watch directory" call to a network.
Definition network.c:290
void fr_network(fr_network_t *nr)
The main network worker function.
Definition network.c:1842
void fr_network_worker_add_self(fr_network_t *nr, fr_worker_t *worker)
Add a worker to a network in the same thread.
Definition network.c:325
int fr_network_exit(fr_network_t *nr)
Signal a network thread to exit.
Definition network.c:1897
void fr_network_stats_log(fr_network_t const *nr, fr_log_t const *log)
Definition network.c:2071
fr_network_t * fr_network_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_network_config_t const *config)
Create a network.
Definition network.c:1930
#define PERROR(_fmt,...)
Definition log.h:228
#define DEBUG3(_fmt,...)
Definition log.h:266
fr_event_list_t * fr_event_list_alloc(TALLOC_CTX *ctx, fr_event_status_cb_t status, void *status_uctx)
Initialise a new event list.
Definition event.c:2522
int fr_event_pre_insert(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Add a pre-event callback to the event list.
Definition event.c:1953
void fr_event_loop_exit(fr_event_list_t *el, int code)
Signal an event loop exit with the specified code.
Definition event.c:2371
int fr_event_post_insert(fr_event_list_t *el, fr_event_post_cb_t callback, void *uctx)
Add a post-event callback to the event list.
Definition event.c:2007
Stores all information relating to an event list.
Definition event.c:377
fr_log_lvl_t
Definition log.h:67
static const conf_parser_t config[]
Definition base.c:169
#define DEBUG2(fmt,...)
static bool done
Definition radclient.c:83
#define INFO(fmt,...)
Definition radict.c:64
CONF_SECTION * cs
thread pool configuration section
Definition schedule.c:128
TALLOC_CTX * ctx
our allocation ctx
Definition schedule.c:106
fr_timer_t * ev
timer for stats_interval
Definition schedule.c:118
sem_t worker_sem
for inter-thread signaling
Definition schedule.c:138
fr_schedule_child_status_t status
status of the worker
Definition schedule.c:115
static _Thread_local int worker_id
Internal ID of the current worker thread.
Definition schedule.c:151
fr_event_list_t * el
event list for single-threaded mode.
Definition schedule.c:129
fr_schedule_t * sc
the scheduler we are running under
Definition schedule.c:113
fr_worker_t * single_worker
for single-threaded mode
Definition schedule.c:148
fr_log_lvl_t lvl
log level
Definition schedule.c:132
static void stats_timer(fr_timer_list_t *tl, fr_time_t now, void *uctx)
Definition schedule.c:304
fr_network_t * fr_schedule_directory_add(fr_schedule_t *sc, fr_listen_t *li)
Add a directory NOTE_EXTEND to a scheduler.
Definition schedule.c:933
#define SEM_WAIT_INTR(_x)
Definition schedule.c:65
fr_schedule_config_t * config
configuration
Definition schedule.c:134
fr_network_t * single_network
for single-threaded mode
Definition schedule.c:147
fr_schedule_thread_instantiate_t worker_thread_instantiate
thread instantiation callback
Definition schedule.c:141
fr_schedule_child_status_t
Track the child thread status.
Definition schedule.c:70
@ FR_CHILD_FAIL
failed, and in the exited queue
Definition schedule.c:75
@ FR_CHILD_FREE
child is free
Definition schedule.c:71
@ FR_CHILD_RUNNING
running, and in the running queue
Definition schedule.c:73
@ FR_CHILD_EXITED
exited, and in the exited queue
Definition schedule.c:74
@ FR_CHILD_INITIALIZING
initialized, but not running
Definition schedule.c:72
fr_network_t * fr_schedule_listen_add(fr_schedule_t *sc, fr_listen_t *li)
Add a fr_listen_t to a scheduler.
Definition schedule.c:901
fr_network_t * nr
the receive data structure
Definition schedule.c:116
fr_schedule_thread_detach_t worker_thread_detach
Definition schedule.c:142
bool running
is the scheduler running?
Definition schedule.c:126
int fr_schedule_worker_id(void)
Return the worker id for the current thread.
Definition schedule.c:157
static void * fr_schedule_worker_thread(void *arg)
Entry point for worker threads.
Definition schedule.c:167
int fr_schedule_pthread_create(pthread_t *thread, void *(*func)(void *), void *arg)
Creates a new thread using our standard set of options.
Definition schedule.c:428
fr_schedule_t * fr_schedule_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_log_t *logger, fr_log_lvl_t lvl, fr_schedule_thread_instantiate_t worker_thread_instantiate, fr_schedule_thread_detach_t worker_thread_detach, fr_schedule_config_t *config)
Create a scheduler and spawn the child threads.
Definition schedule.c:468
unsigned int num_workers_exited
number of exited workers
Definition schedule.c:136
sem_t network_sem
for inter-thread signaling
Definition schedule.c:139
int fr_schedule_destroy(fr_schedule_t **sc_to_free)
Destroy a scheduler, and tell its child threads to exit.
Definition schedule.c:777
unsigned int id
a unique ID
Definition schedule.c:109
fr_dlist_head_t networks
list of networks
Definition schedule.c:145
pthread_t pthread_id
the thread of this network
Definition schedule.c:107
fr_log_t * log
log destination
Definition schedule.c:131
fr_dlist_head_t workers
list of workers
Definition schedule.c:144
static void * fr_schedule_network_thread(void *arg)
Initialize and run the network thread.
Definition schedule.c:318
fr_dlist_t entry
our entry into the linked list of networks
Definition schedule.c:111
#define SEMAPHORE_LOCKED
Definition schedule.c:46
Scheduler specific information for network threads.
Definition schedule.c:105
The scheduler.
Definition schedule.c:125
int(* fr_schedule_thread_instantiate_t)(TALLOC_CTX *ctx, fr_event_list_t *el, void *uctx)
Setup a new thread.
Definition schedule.h:55
void(* fr_schedule_thread_detach_t)(void *uctx)
Explicitly free resources allocated by fr_schedule_thread_instantiate_t.
Definition schedule.h:61
fr_time_delta_t stats_interval
print channel statistics
Definition schedule.h:70
static const uchar sc[16]
Definition smbdes.c:115
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
Definition snprintf.c:689
Definition log.h:96
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition syserror.c:243
#define fr_time_delta_ispos(_a)
Definition time.h:290
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition time.h:196
"server local" time.
Definition time.h:69
An event timer list.
Definition timer.c:50
A timer event.
Definition timer.c:84
#define fr_timer_in(...)
Definition timer.h:87
#define fr_timer_at(...)
Definition timer.h:81
static fr_event_list_t * el
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition strerror.h:64
#define fr_strerror_const(_msg)
Definition strerror.h:223
int fr_worker_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
Pre-event handler.
Definition worker.c:1565
fr_worker_t * fr_worker_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config)
Create a worker.
Definition worker.c:1358
fr_cmd_table_t cmd_worker_table[]
Definition worker.c:1756
void fr_worker_destroy(fr_worker_t *worker)
Destroy a worker.
Definition worker.c:1018
void fr_worker_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Post-event handler.
Definition worker.c:1586
void fr_worker(fr_worker_t *worker)
The main loop and entry point of the stand-alone worker thread.
Definition worker.c:1507
A worker which takes packets from a master, and processes them.
Definition worker.c:98
unsigned int id
a unique ID
Definition schedule.c:88
int uses
how many network threads are using it
Definition schedule.c:89
pthread_t pthread_id
the thread of this worker
Definition schedule.c:86
fr_schedule_t * sc
the scheduler we are running under
Definition schedule.c:94
TALLOC_CTX * ctx
our allocation ctx
Definition schedule.c:84
fr_event_list_t * el
our event list
Definition schedule.c:85
fr_time_t cpu_time
how much CPU time this worker has used
Definition schedule.c:90
fr_dlist_t entry
our entry into the linked list of workers
Definition schedule.c:92
fr_schedule_child_status_t status
status of the worker
Definition schedule.c:96
fr_worker_t * worker
the worker data structure
Definition schedule.c:97
Scheduler specific information for worker threads.
Definition schedule.c:83