The FreeRADIUS server  $Id: 15bac2a4c627c01d1aa2047687b3418955ac7f00 $
schedule.c
Go to the documentation of this file.
1 /*
2  * This program is free software; you can redistribute it and/or modify
3  * it under the terms of the GNU General Public License as published by
4  * the Free Software Foundation; either version 2 of the License, or
5  * (at your option) any later version.
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
15  */
16 
17 /**
18  * $Id: eb4ec47bf0228a87b36c1f8c69b6d8e8f919a280 $
19  *
20  * @brief Network / worker thread scheduling
21  * @file io/schedule.c
22  *
23  * @copyright 2016 Alan DeKok (aland@freeradius.org)
24  */
25 RCSID("$Id: eb4ec47bf0228a87b36c1f8c69b6d8e8f919a280 $")
26 
27 #define LOG_DST sc->log
28 
29 #include <freeradius-devel/autoconf.h>
30 
31 #include <freeradius-devel/io/schedule.h>
32 #include <freeradius-devel/util/dlist.h>
33 #include <freeradius-devel/util/rb.h>
34 #include <freeradius-devel/util/syserror.h>
35 #include <freeradius-devel/server/trigger.h>
36 
37 #include <pthread.h>
38 
39 /*
40  * Other OS's have sem_init, OS X doesn't.
41  */
42 #ifdef HAVE_SEMAPHORE_H
43 #include <semaphore.h>
44 #endif
45 
46 #define SEMAPHORE_LOCKED (0)
47 
48 #ifdef __APPLE__
49 #include <mach/task.h>
50 #include <mach/mach_init.h>
51 #include <mach/semaphore.h>
52 
53 #undef sem_t
54 #define sem_t semaphore_t
55 #undef sem_init
56 #define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
57 #undef sem_wait
58 #define sem_wait(s) semaphore_wait(*s)
59 #undef sem_post
60 #define sem_post(s) semaphore_signal(*s)
61 #undef sem_destroy
62 #define sem_destroy(s) semaphore_destroy(mach_task_self(),*s)
63 #endif /* __APPLE__ */
64 
65 #define SEM_WAIT_INTR(_x) do {if (sem_wait(_x) == 0) break;} while (errno == EINTR)
66 
67 /**
68  * Track the child thread status.
69  */
71  FR_CHILD_FREE = 0, //!< child is free
72  FR_CHILD_INITIALIZING, //!< initialized, but not running
73  FR_CHILD_RUNNING, //!< running, and in the running queue
74  FR_CHILD_EXITED, //!< exited, and in the exited queue
75  FR_CHILD_FAIL //!< failed, and in the exited queue
77 
78 /** Scheduler specific information for worker threads
79  *
80  * Wraps a fr_worker_t, tracking additional information that
81  * the scheduler uses.
82  */
83 typedef struct {
84  TALLOC_CTX *ctx; //!< our allocation ctx
85  fr_event_list_t *el; //!< our event list
86  pthread_t pthread_id; //!< the thread of this worker
87 
88  unsigned int id; //!< a unique ID
89  int uses; //!< how many network threads are using it
90  fr_time_t cpu_time; //!< how much CPU time this worker has used
91 
92  fr_dlist_t entry; //!< our entry into the linked list of workers
93 
94  fr_schedule_t *sc; //!< the scheduler we are running under
95 
96  fr_schedule_child_status_t status; //!< status of the worker
97  fr_worker_t *worker; //!< the worker data structure
99 
100 /** Scheduler specific information for network threads
101  *
102  * Wraps a fr_network_t, tracking additional information that
103  * the scheduler uses.
104  */
105 typedef struct {
106  TALLOC_CTX *ctx; //!< our allocation ctx
107  pthread_t pthread_id; //!< the thread of this network
108 
109  unsigned int id; //!< a unique ID
110 
111  fr_dlist_t entry; //!< our entry into the linked list of networks
112 
113  fr_schedule_t *sc; //!< the scheduler we are running under
114 
115  fr_schedule_child_status_t status; //!< status of the worker
116  fr_network_t *nr; //!< the receive data structure
117 
118  fr_event_timer_t const *ev; //!< timer for stats_interval
120 
121 
122 /**
123  * The scheduler
124  */
126  bool running; //!< is the scheduler running?
127 
128  CONF_SECTION *cs; //!< thread pool configuration section
129  fr_event_list_t *el; //!< event list for single-threaded mode.
130 
131  fr_log_t *log; //!< log destination
132  fr_log_lvl_t lvl; //!< log level
133 
134  fr_schedule_config_t *config; //!< configuration
135 
136  unsigned int num_workers_exited; //!< number of exited workers
137 
138  sem_t worker_sem; //!< for inter-thread signaling
139  sem_t network_sem; //!< for inter-thread signaling
140 
143 
144  fr_dlist_head_t workers; //!< list of workers
145  fr_dlist_head_t networks; //!< list of networks
146 
147  fr_network_t *single_network; //!< for single-threaded mode
148  fr_worker_t *single_worker; //!< for single-threaded mode
149 };
150 
151 static _Thread_local int worker_id; //!< Internal ID of the current worker thread.
152 
153 /** Return the worker id for the current thread
154  *
155  * @return worker ID
156  */
158 {
159  return worker_id;
160 }
161 
162 /** Entry point for worker threads
163  *
164  * @param[in] arg the fr_schedule_worker_t
165  * @return NULL
166  */
167 static void *fr_schedule_worker_thread(void *arg)
168 {
169  TALLOC_CTX *ctx;
170  fr_schedule_worker_t *sw = talloc_get_type_abort(arg, fr_schedule_worker_t);
171  fr_schedule_t *sc = sw->sc;
174  char worker_name[32];
175 
176  worker_id = sw->id; /* Store the current worker ID */
177 
178  snprintf(worker_name, sizeof(worker_name), "Worker %d", sw->id);
179 
180  sw->ctx = ctx = talloc_init("%s", worker_name);
181  if (!ctx) {
182  ERROR("%s - Failed allocating memory", worker_name);
183  goto fail;
184  }
185 
186  INFO("%s - Starting", worker_name);
187 
188  sw->el = fr_event_list_alloc(ctx, NULL, NULL);
189  if (!sw->el) {
190  PERROR("%s - Failed creating event list", worker_name);
191  goto fail;
192  }
193 
194 
195  sw->worker = fr_worker_create(ctx, sw->el, worker_name, sc->log, sc->lvl, &sc->config->worker);
196  if (!sw->worker) {
197  PERROR("%s - Failed creating worker", worker_name);
198  goto fail;
199  }
200 
201  /*
202  * @todo make this a registry
203  */
204  if (sc->worker_thread_instantiate) {
205  CONF_SECTION *cs;
206  char section_name[32];
207 
208  snprintf(section_name, sizeof(section_name), "%u", sw->id);
209 
210  cs = cf_section_find(sc->cs, "worker", section_name);
211  if (!cs) cs = cf_section_find(sc->cs, "worker", NULL);
212 
213  if (sc->worker_thread_instantiate(sw->ctx, sw->el, cs) < 0) {
214  PERROR("%s - Worker thread instantiation failed", worker_name);
215  goto fail;
216  }
217  }
218 
219  sw->status = FR_CHILD_RUNNING;
220 
221  /*
222  * Add this worker to all network threads.
223  */
224  for (sn = fr_dlist_head(&sc->networks);
225  sn != NULL;
226  sn = fr_dlist_next(&sc->networks, sn)) {
227  (void) fr_network_worker_add(sn->nr, sw->worker);
228  }
229 
230  DEBUG3("%s - Started", worker_name);
231 
232  /*
233  * Tell the originator that the thread has started.
234  */
235  sem_post(&sc->worker_sem);
236 
237  /*
238  * Do all of the work.
239  */
240  fr_worker(sw->worker);
241 
242  status = FR_CHILD_EXITED;
243 
244 fail:
245  sw->status = status;
246 
247  if (sw->worker) {
249  sw->worker = NULL;
250  }
251 
252  INFO("%s - Exiting", worker_name);
253 
254  if (sc->worker_thread_detach) sc->worker_thread_detach(NULL); /* Fixme once we figure out what uctx should be */
255 
256  /*
257  * Not looping at this point, but may catch timer/fd
258  * insertions being done after the thread should have
259  * exited.
260  */
261  if (sw->el) fr_event_loop_exit(sw->el, 1);
262 
263  /*
264  * Tell the scheduler we're done.
265  */
266  sem_post(&sc->worker_sem);
267 
268  talloc_free(ctx);
269 
270  return NULL;
271 }
272 
273 
274 static void stats_timer(fr_event_list_t *el, fr_time_t now, void *uctx)
275 {
276  fr_schedule_network_t *sn = talloc_get_type_abort(uctx, fr_schedule_network_t);
277 
278  fr_network_stats_log(sn->nr, sn->sc->log);
279 
280  (void) fr_event_timer_at(sn, el, &sn->ev, fr_time_add(now, sn->sc->config->stats_interval), stats_timer, sn);
281 }
282 
283 /** Initialize and run the network thread.
284  *
285  * @param[in] arg the fr_schedule_network_t
286  * @return NULL
287  */
288 static void *fr_schedule_network_thread(void *arg)
289 {
290  TALLOC_CTX *ctx;
291  fr_schedule_network_t *sn = talloc_get_type_abort(arg, fr_schedule_network_t);
292  fr_schedule_t *sc = sn->sc;
295  char network_name[32];
296 
297  snprintf(network_name, sizeof(network_name), "Network %d", sn->id);
298 
299  INFO("%s - Starting", network_name);
300 
301  sn->ctx = ctx = talloc_init("%s", network_name);
302  if (!ctx) {
303  ERROR("%s - Failed allocating memory", network_name);
304  goto fail;
305  }
306 
307  el = fr_event_list_alloc(ctx, NULL, NULL);
308  if (!el) {
309  PERROR("%s - Failed creating event list", network_name);
310  goto fail;
311  }
312 
313  sn->nr = fr_network_create(ctx, el, network_name, sc->log, sc->lvl, &sc->config->network);
314  if (!sn->nr) {
315  PERROR("%s - Failed creating network", network_name);
316  goto fail;
317  }
318 
319  sn->status = FR_CHILD_RUNNING;
320 
321  /*
322  * Tell the originator that the thread has started.
323  */
324  sem_post(&sc->network_sem);
325 
326  DEBUG3("%s - Started", network_name);
327 
328  /*
329  * Print out statistics for this network IO handler.
330  */
331  if (fr_time_delta_ispos(sc->config->stats_interval)) {
332  (void) fr_event_timer_in(sn, el, &sn->ev, sn->sc->config->stats_interval, stats_timer, sn);
333  }
334  /*
335  * Call the main event processing loop of the network
336  * thread Will not return until the worker is about
337  * to exit.
338  */
339  fr_network(sn->nr);
340 
341  status = FR_CHILD_EXITED;
342 
343 fail:
344  sn->status = status;
345 
346  INFO("%s - Exiting", network_name);
347 
348  /*
349  * Tell the scheduler we're done.
350  */
351  sem_post(&sc->network_sem);
352 
353  talloc_free(ctx);
354 
355  return NULL;
356 }
357 
358 /** Creates a new thread using our standard set of options
359  *
360  * New threads are:
361  * - Joinable, i.e. you can call pthread_join on them to confirm they've exited
362  * - Immune to catchable signals.
363  *
364  * @param[out] thread handled that was created by pthread_create.
365  * @param[in] func entry point for the thread.
366  * @param[in] arg Argument to pass to func.
367  * @return
368  * - 0 on success.
369  * - -1 on failure.
370  */
371 int fr_schedule_pthread_create(pthread_t *thread, void *(*func)(void *), void *arg)
372 {
373  pthread_attr_t attr;
374  int ret;
375 
376  /*
377  * Set the thread to wait around after it's exited
378  * so it can be joined. This is more of a useful
379  * mechanism for the parent to determine if all
380  * the threads have exited so it can continue with
381  * a graceful shutdown.
382  */
383  pthread_attr_init(&attr);
384  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
385 
386  ret = pthread_create(thread, &attr, func, arg);
387  if (ret != 0) {
388  fr_strerror_printf("Failed creating thread: %s", fr_syserror(ret));
389  return -1;
390  }
391 
392  return 0;
393 }
394 
395 /** Create a scheduler and spawn the child threads.
396  *
397  * @param[in] ctx talloc context.
398  * @param[in] el event list, only for single-threaded mode.
399  * @param[in] logger destination for all logging messages.
400  * @param[in] lvl log level.
401  * @param[in] worker_thread_instantiate callback for new worker threads.
402  * @param[in] worker_thread_detach callback to destroy resources
403  * allocated by worker_thread_instantiate.
404  * @param[in] config configuration for the scheduler
405  * @return
406  * - NULL on error
407  * - fr_schedule_t new scheduler
408  */
410  fr_log_t *logger, fr_log_lvl_t lvl,
411  fr_schedule_thread_instantiate_t worker_thread_instantiate,
412  fr_schedule_thread_detach_t worker_thread_detach,
414 {
415  unsigned int i;
416  fr_schedule_worker_t *sw, *next_sw;
417  fr_schedule_network_t *sn, *next_sn;
418  fr_schedule_t *sc;
419 
420  sc = talloc_zero(ctx, fr_schedule_t);
421  if (!sc) {
422  fr_strerror_const("Failed allocating memory");
423  return NULL;
424  }
425 
426  sc->config = config;
427  sc->el = el;
428  sc->log = logger;
429  sc->lvl = lvl;
430 
431  sc->worker_thread_instantiate = worker_thread_instantiate;
432  sc->worker_thread_detach = worker_thread_detach;
433  sc->running = true;
434 
435  /*
436  * If we're single-threaded, create network / worker, and insert them into the event loop.
437  */
438  if (el) {
439  sc->single_network = fr_network_create(sc, el, "Network", sc->log, sc->lvl, &sc->config->network);
440  if (!sc->single_network) {
441  PERROR("Failed creating network");
442  pre_instantiate_st_fail:
443  talloc_free(sc);
444  return NULL;
445  }
446 
447  sc->single_worker = fr_worker_create(sc, el, "Worker", sc->log, sc->lvl, &sc->config->worker);
448  if (!sc->single_worker) {
449  PERROR("Failed creating worker");
450  fr_network_destroy(sc->single_network);
451  goto pre_instantiate_st_fail;
452  }
453 
454  /*
455  * Parent thread-specific data from the single_worker
456  */
457  if (sc->worker_thread_instantiate) {
458  CONF_SECTION *subcs;
459 
460  subcs = cf_section_find(sc->cs, "worker", "0");
461  if (!subcs) subcs = cf_section_find(sc->cs, "worker", NULL);
462 
463  if (sc->worker_thread_instantiate(sc->single_worker, el, subcs) < 0) {
464  PERROR("Worker thread instantiation failed");
465  destroy_both:
466  fr_network_destroy(sc->single_network);
467  fr_worker_destroy(sc->single_worker);
468  goto pre_instantiate_st_fail;
469  }
470  }
471 
472  if (fr_command_register_hook(NULL, "0", sc->single_worker, cmd_worker_table) < 0) {
473  PERROR("Failed adding worker commands");
474  st_fail:
475  if (sc->worker_thread_detach) sc->worker_thread_detach(NULL);
476  goto destroy_both;
477  }
478 
479  if (fr_command_register_hook(NULL, "0", sc->single_network, cmd_network_table) < 0) {
480  PERROR("Failed adding network commands");
481  goto st_fail;
482  }
483 
484  (void) fr_network_worker_add(sc->single_network, sc->single_worker);
485  DEBUG("Scheduler created in single-threaded mode");
486 
487  if (fr_event_pre_insert(el, fr_worker_pre_event, sc->single_worker) < 0) {
488  fr_strerror_const("Failed adding pre-check to event list");
489  goto st_fail;
490  }
491 
492  /*
493  * Add the event which processes request_t packets.
494  */
495  if (fr_event_post_insert(el, fr_worker_post_event, sc->single_worker) < 0) {
496  fr_strerror_const("Failed inserting post-processing event");
497  goto st_fail;
498  }
499 
500  return sc;
501  }
502 
503  /*
504  * Parse any scheduler-specific configuration.
505  */
506  if (!config) {
507  MEM(sc->config = talloc_zero(sc, fr_schedule_config_t));
508  sc->config->max_networks = 1;
509  sc->config->max_workers = 4;
510  } else {
511  sc->config = config;
512 
513  if (sc->config->max_networks < 1) sc->config->max_networks = 1;
514  if (sc->config->max_networks > 64) sc->config->max_networks = 64;
515  if (sc->config->max_workers < 1) sc->config->max_workers = 1;
516  if (sc->config->max_workers > 64) sc->config->max_workers = 64;
517  }
518 
519  /*
520  * Create the lists which hold the workers and networks.
521  */
522  fr_dlist_init(&sc->workers, fr_schedule_worker_t, entry);
523  fr_dlist_init(&sc->networks, fr_schedule_network_t, entry);
524 
525  memset(&sc->network_sem, 0, sizeof(sc->network_sem));
526  if (sem_init(&sc->network_sem, 0, SEMAPHORE_LOCKED) != 0) {
527  ERROR("Failed creating semaphore: %s", fr_syserror(errno));
528  talloc_free(sc);
529  return NULL;
530  }
531 
532  memset(&sc->worker_sem, 0, sizeof(sc->worker_sem));
533  if (sem_init(&sc->worker_sem, 0, SEMAPHORE_LOCKED) != 0) {
534  ERROR("Failed creating semaphore: %s", fr_syserror(errno));
535  talloc_free(sc);
536  return NULL;
537  }
538 
539  /*
540  * Create the network threads first.
541  */
542  for (i = 0; i < sc->config->max_networks; i++) {
543  DEBUG3("Creating %u/%u networks", i + 1, sc->config->max_networks);
544 
545  /*
546  * Create a worker "glue" structure
547  */
548  sn = talloc_zero(sc, fr_schedule_network_t);
549  if (!sn) {
550  ERROR("Network %u - Failed allocating memory", i);
551  break;
552  }
553 
554  sn->id = i;
555  sn->sc = sc;
557  fr_dlist_insert_head(&sc->networks, sn);
558 
560  PERROR("Failed creating network %u", i);
561  break;
562  }
563  }
564 
565  /*
566  * Wait for all of the networks to signal us that either
567  * they've started, OR there's been a problem and they
568  * can't start.
569  */
570  for (i = 0; i < (unsigned int)fr_dlist_num_elements(&sc->networks); i++) {
571  DEBUG3("Waiting for semaphore from network %u/%u",
572  i + 1, (unsigned int)fr_dlist_num_elements(&sc->networks));
573  SEM_WAIT_INTR(&sc->network_sem);
574  }
575 
576  /*
577  * See if all of the networks have started.
578  */
579  for (sn = fr_dlist_head(&sc->networks);
580  sn != NULL;
581  sn = next_sn) {
582  next_sn = fr_dlist_next(&sc->networks, sn);
583 
584  if (sn->status != FR_CHILD_RUNNING) {
585  fr_dlist_remove(&sc->networks, sn);
586  continue;
587  }
588  }
589 
590  /*
591  * Failed to start some workers, refuse to do anything!
592  */
593  if ((unsigned int)fr_dlist_num_elements(&sc->networks) < sc->config->max_networks) {
595  return NULL;
596  }
597 
598  /*
599  * Create all of the workers.
600  */
601  for (i = 0; i < sc->config->max_workers; i++) {
602  DEBUG3("Creating %u/%u workers", i + 1, sc->config->max_workers);
603 
604  /*
605  * Create a worker "glue" structure
606  */
607  sw = talloc_zero(sc, fr_schedule_worker_t);
608  if (!sw) {
609  ERROR("Worker %u - Failed allocating memory", i);
610  break;
611  }
612 
613  sw->id = i;
614  sw->sc = sc;
616  fr_dlist_insert_head(&sc->workers, sw);
617 
619  PERROR("Failed creating worker %u", i);
620  break;
621  }
622  }
623 
624  /*
625  * Wait for all of the workers to signal us that either
626  * they've started, OR there's been a problem and they
627  * can't start.
628  */
629  for (i = 0; i < (unsigned int)fr_dlist_num_elements(&sc->workers); i++) {
630  DEBUG3("Waiting for semaphore from worker %u/%u",
631  i + 1, (unsigned int)fr_dlist_num_elements(&sc->workers));
632  SEM_WAIT_INTR(&sc->worker_sem);
633  }
634 
635  /*
636  * See if all of the workers have started.
637  */
638  for (sw = fr_dlist_head(&sc->workers);
639  sw != NULL;
640  sw = next_sw) {
641 
642  next_sw = fr_dlist_next(&sc->workers, sw);
643 
644  if (sw->status != FR_CHILD_RUNNING) {
645  fr_dlist_remove(&sc->workers, sw);
646  continue;
647  }
648  }
649 
650  /*
651  * Failed to start some workers, refuse to do anything!
652  */
653  if ((unsigned int)fr_dlist_num_elements(&sc->workers) < sc->config->max_workers) {
655  return NULL;
656  }
657 
658  for (sw = fr_dlist_head(&sc->workers), i = 0;
659  sw != NULL;
660  sw = next_sw, i++) {
661  char buffer[32];
662 
663  next_sw = fr_dlist_next(&sc->workers, sw);
664 
665  snprintf(buffer, sizeof(buffer), "%d", i);
667  PERROR("Failed adding worker commands");
668  goto st_fail;
669  }
670  }
671 
672  for (sn = fr_dlist_head(&sc->networks), i = 0;
673  sn != NULL;
674  sn = next_sn, i++) {
675  char buffer[32];
676 
677  next_sn = fr_dlist_next(&sc->networks, sn);
678 
679  snprintf(buffer, sizeof(buffer), "%d", i);
680  if (fr_command_register_hook(NULL, buffer, sn->nr, cmd_network_table) < 0) {
681  PERROR("Failed adding network commands");
682  goto st_fail;
683  }
684  }
685 
686  if (sc) INFO("Scheduler created successfully with %u networks and %u workers",
687  sc->config->max_networks, (unsigned int)fr_dlist_num_elements(&sc->workers));
688 
689  return sc;
690 }
691 
692 /** Destroy a scheduler, and tell its child threads to exit.
693  *
694  * @note This may be called with no worker or network threads in the case of a
695  * instantiation error. This function _should_ deal with that condition
696  * gracefully.
697  *
698  * @param[in] sc_to_free the scheduler
699  * @return
700  * - <0 on error
701  * - 0 on success
702  */
704 {
705  fr_schedule_t *sc = *sc_to_free;
706  unsigned int i;
709  int ret;
710 
711  if (!sc) return 0;
712 
713  sc->running = false;
714 
715  /*
716  * Single threaded mode: kill the only network / worker we have.
717  */
718  if (sc->el) {
719  /*
720  * Destroy the network side first. It tells the
721  * workers to close.
722  */
723  fr_network_destroy(sc->single_network);
724  fr_worker_destroy(sc->single_worker);
725  goto done;
726  }
727 
728  /*
729  * Signal each network thread to exit.
730  */
731  for (sn = fr_dlist_head(&sc->networks);
732  sn != NULL;
733  sn = fr_dlist_next(&sc->networks, sn)) {
734  fr_network_exit(sn->nr);
735  }
736 
737  /*
738  * If the network threads are running, tell them to exit,
739  * and wait for them to do so. Each network thread tells
740  * all of its worker threads that it's exiting. It then
741  * closes the channels. When the workers see that there
742  * are no input channels, they exit, too.
743  */
744  for (i = 0; i < (unsigned int)fr_dlist_num_elements(&sc->networks); i++) {
745  DEBUG2("Scheduler - Waiting for semaphore indicating network exit %u/%u", i + 1,
746  (unsigned int)fr_dlist_num_elements(&sc->networks));
747  SEM_WAIT_INTR(&sc->network_sem);
748  }
749  DEBUG2("Scheduler - All networks indicated exit complete");
750 
751  while ((sn = fr_dlist_head(&sc->networks)) != NULL) {
752  fr_dlist_remove(&sc->networks, sn);
753 
754  /*
755  * Ensure that the thread has exited before
756  * cleaning up the context.
757  *
758  * This also ensures that the child threads have
759  * exited before the main thread cleans up the
760  * module instances.
761  */
762  if ((ret = pthread_join(sn->pthread_id, NULL)) != 0) {
763  ERROR("Failed joining network %i: %s", sn->id, fr_syserror(ret));
764  } else {
765  DEBUG2("Network %i joined (cleaned up)", sn->id);
766  }
767  }
768 
769  /*
770  * Wait for all worker threads to finish. THEN clean up
771  * modules. Otherwise, the modules will be removed from
772  * underneath the workers!
773  */
774  for (i = 0; i < (unsigned int)fr_dlist_num_elements(&sc->workers); i++) {
775  DEBUG2("Scheduler - Waiting for semaphore indicating worker exit %u/%u", i + 1,
776  (unsigned int)fr_dlist_num_elements(&sc->workers));
777  SEM_WAIT_INTR(&sc->worker_sem);
778  }
779  DEBUG2("Scheduler - All workers indicated exit complete");
780 
781  /*
782  * Clean up the exited workers.
783  */
784  while ((sw = fr_dlist_head(&sc->workers)) != NULL) {
785  fr_dlist_remove(&sc->workers, sw);
786 
787  /*
788  * Ensure that the thread has exited before
789  * cleaning up the context.
790  *
791  * This also ensures that the child threads have
792  * exited before the main thread cleans up the
793  * module instances.
794  */
795  if ((ret = pthread_join(sw->pthread_id, NULL)) != 0) {
796  ERROR("Failed joining worker %i: %s", sw->id, fr_syserror(ret));
797  } else {
798  DEBUG2("Worker %i joined (cleaned up)", sw->id);
799  }
800  }
801 
802  sem_destroy(&sc->network_sem);
803  sem_destroy(&sc->worker_sem);
804 done:
805  /*
806  * Now that all of the workers are done, we can return to
807  * the caller, and have it dlclose() the modules.
808  */
809  talloc_free(sc);
810  *sc_to_free = NULL;
811 
812  return 0;
813 }
814 
815 /** Add a fr_listen_t to a scheduler.
816  *
817  * @param[in] sc the scheduler
818  * @param[in] li the ctx and callbacks for the transport.
819  * @return
820  * - NULL on error
821  * - the fr_network_t that the socket was added to.
822  */
824 {
825  fr_network_t *nr;
826 
827  (void) talloc_get_type_abort(sc, fr_schedule_t);
828 
829  if (sc->el) {
830  nr = sc->single_network;
831  } else {
833 
834  /*
835  * @todo - round robin it among the listeners?
836  * or maybe add it to the same parent thread?
837  */
838  sn = fr_dlist_head(&sc->networks);
839  nr = sn->nr;
840  }
841 
842  if (fr_network_listen_add(nr, li) < 0) return NULL;
843 
844  return nr;
845 }
846 
847 /** Add a directory NOTE_EXTEND to a scheduler.
848  *
849  * @param[in] sc the scheduler
850  * @param[in] li the ctx and callbacks for the transport.
851  * @return
852  * - NULL on error
853  * - the fr_network_t that the socket was added to.
854  */
856 {
857  fr_network_t *nr;
858 
859  (void) talloc_get_type_abort(sc, fr_schedule_t);
860 
861  if (sc->el) {
862  nr = sc->single_network;
863  } else {
865 
866  /*
867  * @todo - round robin it among the listeners?
868  * or maybe add it to the same parent thread?
869  */
870  sn = fr_dlist_head(&sc->networks);
871  nr = sn->nr;
872  }
873 
874  if (fr_network_directory_add(nr, li) < 0) return NULL;
875 
876  return nr;
877 }
static int const char char buffer[256]
Definition: acutest.h:574
#define RCSID(id)
Definition: build.h:444
A section grouping multiple CONF_PAIR.
Definition: cf_priv.h:89
CONF_SECTION * cf_section_find(CONF_SECTION const *cs, char const *name1, char const *name2)
Find a CONF_SECTION with name1 and optionally name2.
Definition: cf_util.c:970
fr_command_register_hook_t fr_command_register_hook
Definition: command.c:42
#define ERROR(fmt,...)
Definition: dhcpclient.c:41
#define DEBUG(fmt,...)
Definition: dhcpclient.c:39
#define fr_dlist_init(_head, _type, _field)
Initialise the head structure of a doubly linked list.
Definition: dlist.h:260
static void * fr_dlist_next(fr_dlist_head_t const *list_head, void const *ptr)
Get the next item in a list.
Definition: dlist.h:555
static unsigned int fr_dlist_num_elements(fr_dlist_head_t const *head)
Return the number of elements in the dlist.
Definition: dlist.h:939
static void * fr_dlist_head(fr_dlist_head_t const *list_head)
Return the HEAD item of a list or NULL if the list is empty.
Definition: dlist.h:486
static void * fr_dlist_remove(fr_dlist_head_t *list_head, void *ptr)
Remove an item from the list.
Definition: dlist.h:638
static int fr_dlist_insert_head(fr_dlist_head_t *list_head, void *ptr)
Insert an item into the head of a list.
Definition: dlist.h:338
Head of a doubly linked list.
Definition: dlist.h:51
Entry in a doubly linked list.
Definition: dlist.h:41
#define fr_event_timer_at(...)
Definition: event.h:250
#define fr_event_timer_in(...)
Definition: event.h:255
fr_cmd_table_t cmd_network_table[]
Definition: network.c:2073
int fr_network_listen_add(fr_network_t *nr, fr_listen_t *li)
Add a fr_listen_t to a network.
Definition: network.c:236
int fr_network_worker_add(fr_network_t *nr, fr_worker_t *worker)
Add a worker to a network.
Definition: network.c:293
int fr_network_destroy(fr_network_t *nr)
Stop a network thread in an orderly way.
Definition: network.c:1664
int fr_network_directory_add(fr_network_t *nr, fr_listen_t *li)
Add a "watch directory" call to a network.
Definition: network.c:278
fr_network_t * fr_network_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_network_config_t const *config)
Create a network.
Definition: network.c:1865
void fr_network(fr_network_t *nr)
The main network worker function.
Definition: network.c:1777
int fr_network_exit(fr_network_t *nr)
Signal a network thread to exit.
Definition: network.c:1832
void fr_network_stats_log(fr_network_t const *nr, fr_log_t const *log)
Definition: network.c:2006
#define PERROR(_fmt,...)
Definition: log.h:228
#define DEBUG3(_fmt,...)
Definition: log.h:266
fr_event_list_t * fr_event_list_alloc(TALLOC_CTX *ctx, fr_event_status_cb_t status, void *status_uctx)
Initialise a new event list.
Definition: event.c:2892
talloc_free(reap)
int fr_event_post_insert(fr_event_list_t *el, fr_event_timer_cb_t callback, void *uctx)
Add a post-event callback to the event list.
Definition: event.c:2306
int fr_event_pre_insert(fr_event_list_t *el, fr_event_status_cb_t callback, void *uctx)
Add a pre-event callback to the event list.
Definition: event.c:2252
void fr_event_loop_exit(fr_event_list_t *el, int code)
Signal an event loop exit with the specified code.
Definition: event.c:2737
Stores all information relating to an event list.
Definition: event.c:411
A timer event.
Definition: event.c:102
fr_log_lvl_t
Definition: log.h:67
static const conf_parser_t config[]
Definition: base.c:188
static bool done
Definition: radclient.c:80
#define DEBUG2(fmt,...)
Definition: radclient.h:43
#define INFO(fmt,...)
Definition: radict.c:54
CONF_SECTION * cs
thread pool configuration section
Definition: schedule.c:128
TALLOC_CTX * ctx
our allocation ctx
Definition: schedule.c:106
sem_t worker_sem
for inter-thread signaling
Definition: schedule.c:138
fr_event_timer_t const * ev
timer for stats_interval
Definition: schedule.c:118
fr_schedule_child_status_t status
status of the worker
Definition: schedule.c:115
static _Thread_local int worker_id
Internal ID of the current worker thread.
Definition: schedule.c:151
fr_event_list_t * el
event list for single-threaded mode.
Definition: schedule.c:129
fr_schedule_t * sc
the scheduler we are running under
Definition: schedule.c:113
fr_worker_t * single_worker
for single-threaded mode
Definition: schedule.c:148
static void stats_timer(fr_event_list_t *el, fr_time_t now, void *uctx)
Definition: schedule.c:274
fr_log_lvl_t lvl
log level
Definition: schedule.c:132
#define SEM_WAIT_INTR(_x)
Definition: schedule.c:65
fr_schedule_config_t * config
configuration
Definition: schedule.c:134
fr_network_t * single_network
for single-threaded mode
Definition: schedule.c:147
fr_schedule_thread_instantiate_t worker_thread_instantiate
thread instantiation callback
Definition: schedule.c:141
fr_schedule_child_status_t
Track the child thread status.
Definition: schedule.c:70
@ FR_CHILD_FAIL
failed, and in the exited queue
Definition: schedule.c:75
@ FR_CHILD_FREE
child is free
Definition: schedule.c:71
@ FR_CHILD_RUNNING
running, and in the running queue
Definition: schedule.c:73
@ FR_CHILD_EXITED
exited, and in the exited queue
Definition: schedule.c:74
@ FR_CHILD_INITIALIZING
initialized, but not running
Definition: schedule.c:72
fr_network_t * nr
the receive data structure
Definition: schedule.c:116
fr_schedule_thread_detach_t worker_thread_detach
Definition: schedule.c:142
bool running
is the scheduler running?
Definition: schedule.c:126
int fr_schedule_worker_id(void)
Return the worker id for the current thread.
Definition: schedule.c:157
int fr_schedule_pthread_create(pthread_t *thread, void *(*func)(void *), void *arg)
Creates a new thread using our standard set of options.
Definition: schedule.c:371
unsigned int num_workers_exited
number of exited workers
Definition: schedule.c:136
sem_t network_sem
for inter-thread signaling
Definition: schedule.c:139
static void * fr_schedule_network_thread(void *arg)
Initialize and run the network thread.
Definition: schedule.c:288
fr_schedule_t * fr_schedule_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_log_t *logger, fr_log_lvl_t lvl, fr_schedule_thread_instantiate_t worker_thread_instantiate, fr_schedule_thread_detach_t worker_thread_detach, fr_schedule_config_t *config)
Create a scheduler and spawn the child threads.
Definition: schedule.c:409
int fr_schedule_destroy(fr_schedule_t **sc_to_free)
Destroy a scheduler, and tell its child threads to exit.
Definition: schedule.c:703
static void * fr_schedule_worker_thread(void *arg)
Entry point for worker threads.
Definition: schedule.c:167
unsigned int id
a unique ID
Definition: schedule.c:109
fr_network_t * fr_schedule_directory_add(fr_schedule_t *sc, fr_listen_t *li)
Add a directory NOTE_EXTEND to a scheduler.
Definition: schedule.c:855
fr_dlist_head_t networks
list of networks
Definition: schedule.c:145
pthread_t pthread_id
the thread of this network
Definition: schedule.c:107
fr_network_t * fr_schedule_listen_add(fr_schedule_t *sc, fr_listen_t *li)
Add a fr_listen_t to a scheduler.
Definition: schedule.c:823
fr_log_t * log
log destination
Definition: schedule.c:131
fr_dlist_head_t workers
list of workers
Definition: schedule.c:144
fr_dlist_t entry
our entry into the linked list of networks
Definition: schedule.c:111
#define SEMAPHORE_LOCKED
Definition: schedule.c:46
Scheduler specific information for network threads.
Definition: schedule.c:105
The scheduler.
Definition: schedule.c:125
int(* fr_schedule_thread_instantiate_t)(TALLOC_CTX *ctx, fr_event_list_t *el, void *uctx)
Setup a new thread.
Definition: schedule.h:55
void(* fr_schedule_thread_detach_t)(void *uctx)
Explicitly free resources allocated by fr_schedule_thread_instantiate_t.
Definition: schedule.h:61
fr_time_delta_t stats_interval
print channel statistics
Definition: schedule.h:70
static const uchar sc[16]
Definition: smbdes.c:115
PUBLIC int snprintf(char *string, size_t length, char *format, va_alist)
Definition: snprintf.c:689
MEM(pair_append_request(&vp, attr_eap_aka_sim_identity) >=0)
Definition: log.h:96
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition: syserror.c:243
#define fr_time_delta_ispos(_a)
Definition: time.h:288
#define fr_time_add(_a, _b)
Add a time/time delta together.
Definition: time.h:196
"server local" time.
Definition: time.h:69
static fr_event_list_t * el
#define fr_strerror_printf(_fmt,...)
Log to thread local error buffer.
Definition: strerror.h:64
#define fr_strerror_const(_msg)
Definition: strerror.h:223
int fr_worker_pre_event(UNUSED fr_time_t now, UNUSED fr_time_delta_t wake, void *uctx)
Pre-event handler.
Definition: worker.c:1544
fr_cmd_table_t cmd_worker_table[]
Definition: worker.c:1731
void fr_worker_destroy(fr_worker_t *worker)
Destroy a worker.
Definition: worker.c:1009
fr_worker_t * fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config)
Create a worker.
Definition: worker.c:1353
void fr_worker_post_event(UNUSED fr_event_list_t *el, UNUSED fr_time_t now, void *uctx)
Post-event handler.
Definition: worker.c:1565
void fr_worker(fr_worker_t *worker)
The main loop and entry point of the stand-alone worker thread.
Definition: worker.c:1490
A worker which takes packets from a master, and processes them.
Definition: worker.c:94
unsigned int id
a unique ID
Definition: schedule.c:88
int uses
how many network threads are using it
Definition: schedule.c:89
pthread_t pthread_id
the thread of this worker
Definition: schedule.c:86
fr_schedule_t * sc
the scheduler we are running under
Definition: schedule.c:94
TALLOC_CTX * ctx
our allocation ctx
Definition: schedule.c:84
fr_event_list_t * el
our event list
Definition: schedule.c:85
fr_time_t cpu_time
how much CPU time this worker has used
Definition: schedule.c:90
fr_dlist_t entry
our entry into the linked list of workers
Definition: schedule.c:92
fr_schedule_child_status_t status
status of the worker
Definition: schedule.c:96
fr_worker_t * worker
the worker data structure
Definition: schedule.c:97
Scheduler specific information for worker threads.
Definition: schedule.c:83