All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
threads.c
Go to the documentation of this file.
1 /*
2  * threads.c request threading support
3  *
4  * Version: $Id: 8688452aac475c96d8efcc3b25ad8644efd1f52f $
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19  *
20  * Copyright 2000,2006 The FreeRADIUS server project
21  * Copyright 2000 Alan DeKok <aland@ox.org>
22  */
23 
24 RCSID("$Id: 8688452aac475c96d8efcc3b25ad8644efd1f52f $")
25 USES_APPLE_DEPRECATED_API /* OpenSSL API has been deprecated by Apple */
26 
27 #include <freeradius-devel/radiusd.h>
28 #include <freeradius-devel/process.h>
29 #include <freeradius-devel/heap.h>
30 #include <freeradius-devel/rad_assert.h>
31 
32 /*
33  * Other OS's have sem_init, OS X doesn't.
34  */
35 #ifdef HAVE_SEMAPHORE_H
36 #include <semaphore.h>
37 #endif
38 
39 #ifdef __APPLE__
40 # ifdef WITH_GCD
41 # include <dispatch/dispatch.h>
42 # endif
43 # include <mach/task.h>
44 # include <mach/mach_init.h>
45 # include <mach/semaphore.h>
46 
47 # ifndef WITH_GCD
48 # undef sem_t
49 # define sem_t semaphore_t
50 # undef sem_init
51 # define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
52 # undef sem_wait
53 # define sem_wait(s) semaphore_wait(*s)
54 # undef sem_post
55 # define sem_post(s) semaphore_signal(*s)
56 # endif /* WITH_GCD */
57 #endif /* __APPLE__ */
58 
59 #ifdef HAVE_SYS_WAIT_H
60 # include <sys/wait.h>
61 #endif
62 
63 #ifdef HAVE_PTHREAD_H
64 
65 #ifdef HAVE_OPENSSL_CRYPTO_H
66 # include <openssl/crypto.h>
67 #endif
68 # ifdef HAVE_OPENSSL_ERR_H
69 # include <openssl/err.h>
70 # endif
71 #ifdef HAVE_OPENSSL_EVP_H
72 # include <openssl/evp.h>
73 #endif
74 
75 #ifdef HAVE_GPERFTOOLS_PROFILER_H
76 # include <gperftools/profiler.h>
77 #endif
78 
79 #ifndef WITH_GCD
80 # define SEMAPHORE_LOCKED (0)
81 
82 # define THREAD_RUNNING (1)
83 # define THREAD_CANCELLED (2)
84 # define THREAD_EXITED (3)
85 
86 /*
87  * A data structure which contains the information about
88  * the current thread.
89  */
90 typedef struct THREAD_HANDLE {
91  struct THREAD_HANDLE *prev; //!< Previous thread handle (in the linked list).
92  struct THREAD_HANDLE *next; //!< Next thread handle (int the linked list).
93  pthread_t pthread_id; //!< pthread_id.
94  int thread_num; //!< Server thread number, 1...number of threads.
95  int status; //!< Is the thread running or exited?
96  unsigned int request_count; //!< The number of requests that this thread has handled.
97  time_t timestamp; //!< When the thread started executing.
98  REQUEST *request;
99 } THREAD_HANDLE;
100 
101 #endif /* WITH_GCD */
102 
103 typedef struct thread_fork_t {
104  pid_t pid;
105  int status;
106  int exited;
107 } thread_fork_t;
108 
109 
110 #ifdef WITH_STATS
111 typedef struct fr_pps_t {
112  uint32_t pps_old;
113  uint32_t pps_now;
114  uint32_t pps;
115  time_t time_old;
116 } fr_pps_t;
117 #endif
118 
119 
120 /*
121  * A data structure to manage the thread pool. There's no real
122  * need for a data structure, but it makes things conceptually
123  * easier.
124  */
125 typedef struct THREAD_POOL {
126 #ifndef WITH_GCD
127  THREAD_HANDLE *head;
128  THREAD_HANDLE *tail;
129 
130  uint32_t active_threads; /* protected by queue_mutex */
131  uint32_t total_threads;
132 
133  uint32_t exited_threads;
134  uint32_t max_thread_num;
135  uint32_t start_threads;
136  uint32_t max_threads;
137  uint32_t min_spare_threads;
138  uint32_t max_spare_threads;
139  uint32_t max_requests_per_thread;
140  uint32_t request_count;
141  time_t time_last_spawned;
142  uint32_t cleanup_delay;
143  bool stop_flag;
144 #endif /* WITH_GCD */
145  bool spawn_workers;
146 
147 #ifdef WNOHANG
148  pthread_mutex_t wait_mutex;
149  fr_hash_table_t *waiters;
150 #endif
151 
152 #ifdef WITH_GCD
153  dispatch_queue_t queue;
154 #else
155 
156 # ifdef WITH_STATS
157  fr_pps_t pps_in, pps_out;
158 # ifdef WITH_ACCOUNTING
159  bool auto_limit_acct;
160 # endif
161 # endif
162 
163  /*
164  * All threads wait on this semaphore, for requests
165  * to enter the queue.
166  */
167  sem_t semaphore;
168 
169  /*
170  * To ensure only one thread at a time touches the queue.
171  */
172  pthread_mutex_t queue_mutex;
173 
174  char const *queue_priority;
175  fr_heap_cmp_t heap_cmp;
176 
177  uint32_t max_queue_size;
178  fr_heap_t *heap;
179 #endif /* WITH_GCD */
180 } THREAD_POOL;
181 
182 static THREAD_POOL thread_pool;
183 static bool pool_initialized = false;
184 
185 #ifndef WITH_GCD
186 static time_t last_cleaned = 0;
187 
188 static void thread_pool_manage(time_t now);
189 #endif
190 
191 #ifndef WITH_GCD
192 /*
193  * A mapping of configuration file names to internal integers
194  */
195 static const CONF_PARSER thread_config[] = {
196  { FR_CONF_POINTER("start_servers", PW_TYPE_INTEGER, &thread_pool.start_threads), .dflt = "5" },
197  { FR_CONF_POINTER("max_servers", PW_TYPE_INTEGER, &thread_pool.max_threads), .dflt = "32" },
198  { FR_CONF_POINTER("min_spare_servers", PW_TYPE_INTEGER, &thread_pool.min_spare_threads), .dflt = "3" },
199  { FR_CONF_POINTER("max_spare_servers", PW_TYPE_INTEGER, &thread_pool.max_spare_threads), .dflt = "10" },
200  { FR_CONF_POINTER("max_requests_per_server", PW_TYPE_INTEGER, &thread_pool.max_requests_per_thread), .dflt = "0" },
201  { FR_CONF_POINTER("cleanup_delay", PW_TYPE_INTEGER, &thread_pool.cleanup_delay), .dflt = "5" },
202  { FR_CONF_POINTER("max_queue_size", PW_TYPE_INTEGER, &thread_pool.max_queue_size), .dflt = "65536" },
203  { FR_CONF_POINTER("queue_priority", PW_TYPE_STRING, &thread_pool.queue_priority), .dflt = NULL },
204 # ifdef WITH_STATS
205 # ifdef WITH_ACCOUNTING
206  { FR_CONF_POINTER("auto_limit_acct", PW_TYPE_BOOLEAN, &thread_pool.auto_limit_acct) },
207 # endif
208 # endif
210 };
211 #endif
212 
213 #ifdef WNOHANG
214 /*
215  * We don't want to catch SIGCHLD for a host of reasons.
216  *
217  * - exec_wait means that someone, somewhere, somewhen, will
218  * call waitpid(), and catch the child.
219  *
220  * - SIGCHLD is delivered to a random thread, not the one that
221  * forked.
222  *
223  * - if another thread catches the child, we have to coordinate
224  * with the thread doing the waiting.
225  *
226  * - if we don't waitpid() for non-wait children, they'll be zombies,
227  * and will hang around forever.
228  *
229  */
230 static void reap_children(void)
231 {
232  pid_t pid;
233  int status;
234  thread_fork_t mytf, *tf;
235 
236 
237  pthread_mutex_lock(&thread_pool.wait_mutex);
238 
239  do {
240  retry:
241  pid = waitpid(0, &status, WNOHANG);
242  if (pid <= 0) break;
243 
244  mytf.pid = pid;
245  tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
246  if (!tf) goto retry;
247 
248  tf->status = status;
249  tf->exited = 1;
250  } while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
251 
252  pthread_mutex_unlock(&thread_pool.wait_mutex);
253 }
254 #else
255 # define reap_children()
256 #endif /* WNOHANG */
257 
258 #ifndef WITH_GCD
259 /*
260  * Add a request to the list of waiting requests.
261  * This function gets called ONLY from the main handler thread...
262  *
263  * This function should never fail.
264  */
265 int request_enqueue(REQUEST *request)
266 {
267  /*
268  * If we haven't checked the number of child threads
269  * in a while, OR if the thread pool appears to be full,
270  * go manage it.
271  */
272  if ((last_cleaned < request->timestamp.tv_sec) ||
273  (thread_pool.active_threads == thread_pool.total_threads) ||
274  (thread_pool.exited_threads > 0)) {
275  thread_pool_manage(request->timestamp.tv_sec);
276  }
277 
278 
279  pthread_mutex_lock(&thread_pool.queue_mutex);
280 
281 # if defined(WITH_STATS) && defined(WITH_ACCOUNTING)
282  if (thread_pool.auto_limit_acct) {
283  struct timeval now;
284 
285  /*
286  * Throw away accounting requests if we're too
287  * busy. The NAS should retransmit these, and no
288  * one should notice.
289  *
290  * In contrast, we always try to process
291  * authentication requests. Those are more time
292  * critical, and it's harder to determine which
293  * we can throw away, and which we can keep.
294  *
295  * We allow the queue to get half full before we
296  * start worrying. Even then, we still require
297  * that the rate of input packets is higher than
298  * the rate of outgoing packets. i.e. the queue
299  * is growing.
300  *
301  * Once that happens, we roll a dice to see where
302  * the barrier is for "keep" versus "toss". If
303  * the queue is smaller than the barrier, we
304  * allow it. If the queue is larger than the
305  * barrier, we throw the packet away. Otherwise,
306  * we keep it.
307  *
308  * i.e. the probability of throwing the packet
309  * away increases from 0 (queue is half full), to
310  * 100 percent (queue is completely full).
311  *
312  * A probabilistic approach allows us to process
313  * SOME of the new accounting packets.
314  */
315  if ((request->packet->code == PW_CODE_ACCOUNTING_REQUEST) &&
316  (fr_heap_num_elements(thread_pool.heap) > (thread_pool.max_queue_size / 2)) &&
317  (thread_pool.pps_in.pps_now > thread_pool.pps_out.pps_now)) {
318  uint32_t prob;
319  uint32_t keep;
320 
321  /*
322  * Take a random value of how full we
323  * want the queue to be. It's OK to be
324  * half full, but we get excited over
325  * anything more than that.
326  */
327  keep = (thread_pool.max_queue_size / 2);
328  prob = fr_rand() & ((1 << 10) - 1);
329  keep *= prob;
330  keep >>= 10;
331  keep += (thread_pool.max_queue_size / 2);
332 
333  /*
334  * If the queue is larger than our dice
335  * roll, we throw the packet away.
336  */
337  if (fr_heap_num_elements(thread_pool.heap) > keep) {
338  pthread_mutex_unlock(&thread_pool.queue_mutex);
339  return 0;
340  }
341  }
342 
343  gettimeofday(&now, NULL);
344 
345  /*
346  * Calculate the instantaneous arrival rate into
347  * the queue.
348  */
349  thread_pool.pps_in.pps = rad_pps(&thread_pool.pps_in.pps_old,
350  &thread_pool.pps_in.pps_now,
351  &thread_pool.pps_in.time_old,
352  &now);
353 
354  thread_pool.pps_in.pps_now++;
355  }
356 # endif
357 
358  thread_pool.request_count++;
359 
360  if (fr_heap_num_elements(thread_pool.heap) >= thread_pool.max_queue_size) {
361  pthread_mutex_unlock(&thread_pool.queue_mutex);
362 
363  /*
364  * Mark the request as done.
365  */
366  RATE_LIMIT(ERROR("Something is blocking the server. There are %zd packets in the queue, "
367  "waiting to be processed. Ignoring the new request.", fr_heap_num_elements(thread_pool.heap)));
368  return 0;
369  }
370  request->component = "<core>";
371  request->module = "<queue>";
372  request->child_state = REQUEST_QUEUED;
373 
374  /*
375  * Push the request onto the incoming heap
376  */
377  if (!fr_heap_insert(thread_pool.heap, request)) {
378  pthread_mutex_unlock(&thread_pool.queue_mutex);
379  ERROR("!!! ERROR !!! Failed inserting request %d into the queue", request->number);
380  return 0;
381  }
382 
383  pthread_mutex_unlock(&thread_pool.queue_mutex);
384 
385  /*
386  * There's one more request in the queue.
387  *
388  * Note that we're not touching the queue any more, so
389  * the semaphore post is outside of the mutex. This also
390  * means that when the thread wakes up and tries to lock
391  * the mutex, it will be unlocked, and there won't be
392  * contention.
393  */
394  sem_post(&thread_pool.semaphore);
395 
396  return 1;
397 }
398 
399 /*
400  * Remove a request from the queue.
401  */
402 static int request_dequeue(REQUEST **prequest)
403 {
404  time_t blocked;
405  static time_t last_complained = 0;
406  static time_t total_blocked = 0;
407  int num_blocked = 0;
408  REQUEST *request = NULL;
409  reap_children();
410 
411  pthread_mutex_lock(&thread_pool.queue_mutex);
412 
413 # if defined(WITH_STATS) && defined(WITH_ACCOUNTING)
414  if (thread_pool.auto_limit_acct) {
415  struct timeval now;
416 
417  gettimeofday(&now, NULL);
418 
419  /*
420  * Calculate the instantaneous departure rate
421  * from the queue.
422  */
423  thread_pool.pps_out.pps = rad_pps(&thread_pool.pps_out.pps_old,
424  &thread_pool.pps_out.pps_now,
425  &thread_pool.pps_out.time_old,
426  &now);
427  thread_pool.pps_out.pps_now++;
428  }
429 # endif
430 
431 retry:
432  /*
433  * Grab the first entry.
434  */
435  request = fr_heap_peek(thread_pool.heap);
436  if (!request) {
437  pthread_mutex_unlock(&thread_pool.queue_mutex);
438  *prequest = NULL;
439  return 0;
440  }
441 
442  (void) fr_heap_extract(thread_pool.heap, request);
443 
444  VERIFY_REQUEST(request);
445 
446  /*
447  * Too late. Mark it as done, and continue.
448  *
449  * @fixme: with a heap, we can dynamically remove it from the heap!
450  */
451  if (request->master_state == REQUEST_STOP_PROCESSING) {
452  request->module = "<done>";
453  request->child_state = REQUEST_DONE;
454  goto retry;
455  }
456 
457  *prequest = request;
458 
459  rad_assert(*prequest != NULL);
460  rad_assert(request->magic == REQUEST_MAGIC);
461 
462  request->component = "<core>";
463  request->module = "";
464  request->child_state = REQUEST_RUNNING;
465 
466  /*
467  * The thread is currently processing a request.
468  */
469  thread_pool.active_threads++;
470 
471  blocked = time(NULL);
472  if (!request->proxy && (blocked - request->timestamp.tv_sec) > 5) {
473  total_blocked++;
474  if (last_complained < blocked) {
475  last_complained = blocked;
476  blocked -= request->timestamp.tv_sec;
477  num_blocked = total_blocked;
478  } else {
479  blocked = 0;
480  }
481  } else {
482  total_blocked = 0;
483  blocked = 0;
484  }
485 
486  pthread_mutex_unlock(&thread_pool.queue_mutex);
487 
488  if (blocked) {
489  ERROR("%d requests have been waiting in the processing queue for %d seconds. Check that all databases are running properly!",
490  num_blocked, (int) blocked);
491  }
492 
493  return 1;
494 }
495 
496 
497 /*
498  * The main thread handler for requests.
499  *
500  * Wait on the semaphore until we have it, and process the request.
501  */
502 static void *request_handler_thread(void *arg)
503 {
504  THREAD_HANDLE *self = (THREAD_HANDLE *) arg;
505 
506  /*
507  * Loop forever, until told to exit.
508  */
509  do {
510 # ifdef HAVE_GPERFTOOLS_PROFILER_H
511  ProfilerRegisterThread();
512 # endif
513 
514  /*
515  * Wait to be signalled.
516  */
517  DEBUG2("Thread %d waiting to be assigned a request",
518  self->thread_num);
519  re_wait:
520  if (sem_wait(&thread_pool.semaphore) != 0) {
521  /*
522  * Interrupted system call. Go back to
523  * waiting, but DON'T print out any more
524  * text.
525  */
526  if (errno == EINTR) {
527  DEBUG2("Re-wait %d", self->thread_num);
528  goto re_wait;
529  }
530  ERROR("Thread %d failed waiting for semaphore: %s: Exiting\n",
531  self->thread_num, fr_syserror(errno));
532  break;
533  }
534 
535  DEBUG2("Thread %d got semaphore", self->thread_num);
536 
537 # ifdef HAVE_OPENSSL_ERR_H
538  /*
539  * Clear the error queue for the current thread.
540  */
541  ERR_clear_error();
542 # endif
543 
544  /*
545  * The server is exiting. Don't dequeue any
546  * requests.
547  */
548  if (thread_pool.stop_flag) break;
549 
550  /*
551  * Try to grab a request from the queue.
552  *
553  * It may be empty, in which case we fail
554  * gracefully.
555  */
556  if (!request_dequeue(&self->request)) continue;
557 
558  self->request->child_pid = self->pthread_id;
559  self->request_count++;
560 
561  DEBUG2("Thread %d handling request %d, (%d handled so far)",
562  self->thread_num, self->request->number,
563  self->request_count);
564 
565 # ifdef WITH_ACCOUNTING
566  if ((self->request->packet->code == PW_CODE_ACCOUNTING_REQUEST) &&
567  thread_pool.auto_limit_acct) {
568  VALUE_PAIR *vp;
569  REQUEST *request = self->request;
570 
571  vp = radius_pair_create(request, &request->config,
572  181, VENDORPEC_FREERADIUS);
573  if (vp) vp->vp_integer = thread_pool.pps_in.pps;
574 
575  vp = radius_pair_create(request, &request->config,
576  182, VENDORPEC_FREERADIUS);
577  if (vp) vp->vp_integer = thread_pool.pps_in.pps;
578 
579  vp = radius_pair_create(request, &request->config,
580  183, VENDORPEC_FREERADIUS);
581  if (vp) {
582  vp->vp_integer = thread_pool.max_queue_size - fr_heap_num_elements(thread_pool.heap);
583  vp->vp_integer *= 100;
584  vp->vp_integer /= thread_pool.max_queue_size;
585  }
586  }
587 # endif
588 
589  self->request->process(self->request, FR_ACTION_RUN);
590  self->request = NULL;
591 
592  /*
593  * Update the active threads.
594  */
595  pthread_mutex_lock(&thread_pool.queue_mutex);
596  rad_assert(thread_pool.active_threads > 0);
597  thread_pool.active_threads--;
598  pthread_mutex_unlock(&thread_pool.queue_mutex);
599 
600  /*
601  * If the thread has handled too many requests, then make it
602  * exit.
603  */
604  if ((thread_pool.max_requests_per_thread > 0) &&
605  (self->request_count >= thread_pool.max_requests_per_thread)) {
606  DEBUG2("Thread %d handled too many requests",
607  self->thread_num);
608  break;
609  }
610  } while (self->status != THREAD_CANCELLED);
611 
612  DEBUG2("Thread %d exiting...", self->thread_num);
613 
614 # ifdef HAVE_OPENSSL_ERR_H
615  /*
616  * If we linked with OpenSSL, the application
617  * must remove the thread's error queue before
618  * exiting to prevent memory leaks.
619  */
620  ERR_remove_state(0);
621 # endif
622 
623  pthread_mutex_lock(&thread_pool.queue_mutex);
624  thread_pool.exited_threads++;
625  pthread_mutex_unlock(&thread_pool.queue_mutex);
626 
627  /*
628  * Do this as the LAST thing before exiting.
629  */
630  self->request = NULL;
631  self->status = THREAD_EXITED;
632  exec_trigger(NULL, NULL, "server.thread.stop", true);
633 
634  return NULL;
635 }
636 
637 /*
638  * Take a THREAD_HANDLE, delete it from the thread pool and
639  * free its resources.
640  *
641  * This function is called ONLY from the main server thread,
642  * ONLY after the thread has exited.
643  */
644 static void delete_thread(THREAD_HANDLE *handle)
645 {
646  THREAD_HANDLE *prev;
647  THREAD_HANDLE *next;
648 
649  rad_assert(handle->request == NULL);
650 
651  DEBUG2("Deleting thread %d", handle->thread_num);
652 
653  prev = handle->prev;
654  next = handle->next;
655  rad_assert(thread_pool.total_threads > 0);
656  thread_pool.total_threads--;
657 
658  /*
659  * Remove the handle from the list.
660  */
661  if (prev == NULL) {
662  rad_assert(thread_pool.head == handle);
663  thread_pool.head = next;
664  } else {
665  prev->next = next;
666  }
667 
668  if (next == NULL) {
669  rad_assert(thread_pool.tail == handle);
670  thread_pool.tail = prev;
671  } else {
672  next->prev = prev;
673  }
674 
675  /*
676  * Free the handle, now that it's no longer referencable.
677  */
678  free(handle);
679 }
680 
681 
682 /*
683  * Spawn a new thread, and place it in the thread pool.
684  *
685  * The thread is started initially in the blocked state, waiting
686  * for the semaphore.
687  */
688 static THREAD_HANDLE *spawn_thread(time_t now, int do_trigger)
689 {
690  int rcode;
691  THREAD_HANDLE *handle;
692 
693  /*
694  * Ensure that we don't spawn too many threads.
695  */
696  if (thread_pool.total_threads >= thread_pool.max_threads) {
697  DEBUG2("Thread spawn failed. Maximum number of threads (%d) already running.", thread_pool.max_threads);
698  return NULL;
699  }
700 
701  /*
702  * Allocate a new thread handle.
703  */
704  handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
705  memset(handle, 0, sizeof(THREAD_HANDLE));
706  handle->prev = NULL;
707  handle->next = NULL;
708  handle->thread_num = thread_pool.max_thread_num++;
709  handle->request_count = 0;
710  handle->status = THREAD_RUNNING;
711  handle->timestamp = time(NULL);
712 
713  /*
714  * Create the thread joinable, so that it can be cleaned up
715  * using pthread_join().
716  *
717  * Note that the function returns non-zero on error, NOT
718  * -1. The return code is the error, and errno isn't set.
719  */
720  rcode = pthread_create(&handle->pthread_id, 0, request_handler_thread, handle);
721  if (rcode != 0) {
722  free(handle);
723  ERROR("Thread create failed: %s",
724  fr_syserror(rcode));
725  return NULL;
726  }
727 
728  /*
729  * One more thread to go into the list.
730  */
731  thread_pool.total_threads++;
732  DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
733  handle->thread_num, thread_pool.total_threads);
734  if (do_trigger) exec_trigger(NULL, NULL, "server.thread.start", true);
735 
736  /*
737  * Add the thread handle to the tail of the thread pool list.
738  */
739  if (thread_pool.tail) {
740  thread_pool.tail->next = handle;
741  handle->prev = thread_pool.tail;
742  thread_pool.tail = handle;
743  } else {
744  rad_assert(thread_pool.head == NULL);
745  thread_pool.head = thread_pool.tail = handle;
746  }
747 
748  /*
749  * Update the time we last spawned a thread.
750  */
751  thread_pool.time_last_spawned = now;
752 
753  /*
754  * Fire trigger if maximum number of threads reached
755  */
756  if (thread_pool.total_threads >= thread_pool.max_threads)
757  exec_trigger(NULL, NULL, "server.thread.max_threads", true);
758 
759  /*
760  * And return the new handle to the caller.
761  */
762  return handle;
763 }
764 #endif /* WITH_GCD */
765 
766 
767 #ifdef WNOHANG
768 static uint32_t pid_hash(void const *data)
769 {
770  thread_fork_t const *tf = data;
771 
772  return fr_hash(&tf->pid, sizeof(tf->pid));
773 }
774 
775 static int pid_cmp(void const *one, void const *two)
776 {
777  thread_fork_t const *a = one;
778  thread_fork_t const *b = two;
779 
780  return (a->pid - b->pid);
781 }
782 #endif
783 
784 static int timestamp_cmp(void const *one, void const *two)
785 {
786  REQUEST const *a = one;
787  REQUEST const *b = two;
788 
789  if (timercmp(&a->timestamp, &b->timestamp, < )) return -1;
790  if (timercmp(&a->timestamp, &b->timestamp, > )) return +1;
791 
792  return 0;
793 }
794 
795 /*
796  * Smaller entries go to the top of the heap.
797  * Larger ones to the bottom of the heap.
798  */
799 static int default_cmp(void const *one, void const *two)
800 {
801  REQUEST const *a = one;
802  REQUEST const *b = two;
803 
804  if (a->priority < b->priority) return -1;
805  if (a->priority > b->priority) return +1;
806 
807  return timestamp_cmp(one, two);
808 }
809 
810 
811 /*
812  * Prioritize by how far along the EAP session is.
813  */
814 static int state_cmp(void const *one, void const *two)
815 {
816  REQUEST const *a = one;
817  REQUEST const *b = two;
818 
819  /*
820  * Rounds which are further along go higher in the heap.
821  */
822  if (a->packet->rounds > b->packet->rounds) return -1;
823  if (a->packet->rounds < b->packet->rounds) return +1;
824 
825  return default_cmp(one, two);
826 }
827 
828 
829 /** Parse the configuration for the thread pool
830  *
831  */
833 {
834  CONF_SECTION *pool_cf;
835 
836  rad_assert(spawn_workers != NULL);
837  rad_assert(pool_initialized == false); /* not called on HUP */
838 
839  /*
840  * Initialize the thread pool to some reasonable values.
841  */
842  memset(&thread_pool, 0, sizeof(THREAD_POOL));
843 #ifndef WITH_GCD
844  thread_pool.head = NULL;
845  thread_pool.tail = NULL;
846  thread_pool.total_threads = 0;
847  thread_pool.max_thread_num = 1;
848  thread_pool.cleanup_delay = 5;
849  thread_pool.stop_flag = false;
850 #endif
851  thread_pool.spawn_workers = *spawn_workers;
852 
853  pool_cf = cf_subsection_find_next(cs, NULL, "thread");
854 #ifdef WITH_GCD
855  if (pool_cf) WARN("Built with Grand Central Dispatch. Ignoring 'thread' subsection");
856 #else
857  if (!pool_cf) {
858  *spawn_workers = false;
859  return 0;
860  }
861 #endif
862 
863 #ifndef WITH_GCD
864  if (cf_section_parse(pool_cf, NULL, thread_config) < 0) return -1;
865 
866  /*
867  * Catch corner cases.
868  */
869  if (thread_pool.min_spare_threads < 1)
870  thread_pool.min_spare_threads = 1;
871  if (thread_pool.max_spare_threads < 1)
872  thread_pool.max_spare_threads = 1;
873  if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
874  thread_pool.max_spare_threads = thread_pool.min_spare_threads;
875  if (thread_pool.max_threads == 0)
876  thread_pool.max_threads = 256;
877  if ((thread_pool.max_queue_size < 2) || (thread_pool.max_queue_size > 1024*1024)) {
878  ERROR("FATAL: max_queue_size value must be in range 2-1048576");
879  return -1;
880  }
881 
882  if (thread_pool.start_threads > thread_pool.max_threads) {
883  ERROR("FATAL: start_servers (%i) must be <= max_servers (%i)",
884  thread_pool.start_threads, thread_pool.max_threads);
885  return -1;
886  }
887 
888  if (!thread_pool.queue_priority ||
889  (strcmp(thread_pool.queue_priority, "default") == 0)) {
890  thread_pool.heap_cmp = default_cmp;
891 
892  } else if (strcmp(thread_pool.queue_priority, "eap") == 0) {
893  thread_pool.heap_cmp = state_cmp;
894 
895  } else if (strcmp(thread_pool.queue_priority, "time") == 0) {
896  thread_pool.heap_cmp = timestamp_cmp;
897 
898  } else {
899  ERROR("FATAL: Invalid queue_priority '%s'", thread_pool.queue_priority);
900  return -1;
901  }
902 
903 #endif /* WITH_GCD */
904  return 0;
905 }
906 
907 
908 /*
909  * Allocate the thread pool, and seed it with an initial number
910  * of threads.
911  *
912  * FIXME: What to do on a SIGHUP???
913  */
914 int thread_pool_init(void)
915 {
916 #ifndef WITH_GCD
917  uint32_t i;
918  int rcode;
919 #endif
920  time_t now;
921 
922  now = time(NULL);
923 
924  /*
925  * Don't bother initializing the mutexes or
926  * creating the hash tables. They won't be used.
927  */
928  if (!thread_pool.spawn_workers) return 0;
929 
930  /*
931  * The pool has already been initialized. Don't spawn
932  * new threads, and don't forget about forked children.
933  */
934  if (pool_initialized) return 0;
935 
936 #ifdef WNOHANG
937  if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
938  ERROR("FATAL: Failed to initialize wait mutex: %s",
939  fr_syserror(errno));
940  return -1;
941  }
942 
943  /*
944  * Create the hash table of child PID's
945  */
946  thread_pool.waiters = fr_hash_table_create(NULL, pid_hash, pid_cmp, free);
947  if (!thread_pool.waiters) {
948  ERROR("FATAL: Failed to set up wait hash");
949  return -1;
950  }
951 #endif
952 
953 
954 #ifndef WITH_GCD
955  /*
956  * Initialize the queue of requests.
957  */
958  memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
959  rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
960  if (rcode != 0) {
961  ERROR("FATAL: Failed to initialize semaphore: %s",
962  fr_syserror(errno));
963  return -1;
964  }
965 
966  rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
967  if (rcode != 0) {
968  ERROR("FATAL: Failed to initialize queue mutex: %s",
969  fr_syserror(errno));
970  return -1;
971  }
972 
973  thread_pool.heap = fr_heap_create(thread_pool.heap_cmp, offsetof(REQUEST, heap_id));
974  if (!thread_pool.heap) {
975  ERROR("FATAL: Failed to initialize the incoming queue.");
976  return -1;
977  }
978 #endif
979 
980 #ifndef WITH_GCD
981  /*
982  * Create a number of waiting threads.
983  *
984  * If we fail while creating them, do something intelligent.
985  */
986  for (i = 0; i < thread_pool.start_threads; i++) {
987  if (spawn_thread(now, 0) == NULL) {
988  return -1;
989  }
990  }
991 #else
992  thread_pool.queue = dispatch_queue_create("org.freeradius.threads", NULL);
993  if (!thread_pool.queue) {
994  ERROR("Failed creating dispatch queue: %s\n",
995  fr_syserror(errno));
996  fr_exit(1);
997  }
998 #endif
999 
1000  DEBUG2("Thread pool initialized");
1001  pool_initialized = true;
1002  return 0;
1003 }
1004 
1005 
1006 /*
1007  * Stop all threads in the pool.
1008  */
1009 void thread_pool_stop(void)
1010 {
1011 #ifndef WITH_GCD
1012  int i;
1013  int total_threads;
1014  THREAD_HANDLE *handle;
1015  THREAD_HANDLE *next;
1016 
1017  if (!pool_initialized) return;
1018 
1019  /*
1020  * Set pool stop flag.
1021  */
1022  thread_pool.stop_flag = true;
1023 
1024  /*
1025  * Wakeup all threads to make them see stop flag.
1026  */
1027  total_threads = thread_pool.total_threads;
1028  for (i = 0; i != total_threads; i++) {
1029  sem_post(&thread_pool.semaphore);
1030  }
1031 
1032  /*
1033  * Join and free all threads.
1034  */
1035  for (handle = thread_pool.head; handle; handle = next) {
1036  next = handle->next;
1037  pthread_join(handle->pthread_id, NULL);
1038  delete_thread(handle);
1039  }
1040 
1041  fr_heap_delete(thread_pool.heap);
1042 
1043 # ifdef WNOHANG
1044  fr_hash_table_free(thread_pool.waiters);
1045 # endif
1046 #endif
1047 }
1048 
1049 
1050 #ifdef WITH_GCD
1051 int request_enqueue(REQUEST *request)
1052 {
1053  dispatch_block_t block;
1054 
1055  block = ^{
1056  request->process(request, FR_ACTION_RUN);
1057  };
1058 
1059  dispatch_async(thread_pool.queue, block);
1060 
1061  return 1;
1062 }
1063 #endif
1064 
1065 #ifndef WITH_GCD
1066 /*
1067  * Check the min_spare_threads and max_spare_threads.
1068  *
1069  * If there are too many or too few threads waiting, then we
1070  * either create some more, or delete some.
1071  */
1072 static void thread_pool_manage(time_t now)
1073 {
1074  uint32_t spare;
1075  int i, total;
1076  THREAD_HANDLE *handle, *next;
1077  uint32_t active_threads;
1078 
1079  /*
1080  * Loop over the thread pool, deleting exited threads.
1081  */
1082  for (handle = thread_pool.head; handle; handle = next) {
1083  next = handle->next;
1084 
1085  /*
1086  * Maybe we've asked the thread to exit, and it
1087  * has agreed.
1088  */
1089  if (handle->status == THREAD_EXITED) {
1090  pthread_join(handle->pthread_id, NULL);
1091  delete_thread(handle);
1092  pthread_mutex_lock(&thread_pool.queue_mutex);
1093  thread_pool.exited_threads--;
1094  pthread_mutex_unlock(&thread_pool.queue_mutex);
1095  }
1096  }
1097 
1098  /*
1099  * We don't need a mutex lock here, as we're reading
1100  * active_threads, and not modifying it. We want a close
1101  * approximation of the number of active threads, and this
1102  * is good enough.
1103  */
1104  active_threads = thread_pool.active_threads;
1105  spare = thread_pool.total_threads - active_threads;
1106  if (rad_debug_lvl) {
1107  static uint32_t old_total = 0;
1108  static uint32_t old_active = 0;
1109 
1110  if ((old_total != thread_pool.total_threads) || (old_active != active_threads)) {
1111  DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
1112  thread_pool.total_threads, active_threads, spare);
1113  old_total = thread_pool.total_threads;
1114  old_active = active_threads;
1115  }
1116  }
1117 
1118  /*
1119  * If there are too few spare threads. Go create some more.
1120  */
1121  if ((thread_pool.total_threads < thread_pool.max_threads) &&
1122  (spare < thread_pool.min_spare_threads)) {
1123  total = thread_pool.min_spare_threads - spare;
1124 
1125  if ((total + thread_pool.total_threads) > thread_pool.max_threads) {
1126  total = thread_pool.max_threads - thread_pool.total_threads;
1127  }
1128 
1129  DEBUG2("Threads: Spawning %d spares", total);
1130 
1131  /*
1132  * Create a number of spare threads.
1133  */
1134  for (i = 0; i < total; i++) {
1135  handle = spawn_thread(now, 1);
1136  if (handle == NULL) {
1137  return;
1138  }
1139  }
1140 
1141  return; /* there aren't too many spare threads */
1142  }
1143 
1144  /*
1145  * Only delete spare threads if we haven't already done
1146  * so this second.
1147  */
1148  if (now == last_cleaned) {
1149  return;
1150  }
1151  last_cleaned = now;
1152 
1153  /*
1154  * Only delete the spare threads if sufficient time has
1155  * passed since we last created one. This helps to minimize
1156  * the amount of create/delete cycles.
1157  */
1158  if ((now - thread_pool.time_last_spawned) < (int)thread_pool.cleanup_delay) {
1159  return;
1160  }
1161 
1162  /*
1163  * If there are too many spare threads, delete one.
1164  *
1165  * Note that we only delete ONE at a time, instead of
1166  * wiping out many. This allows the excess servers to
1167  * be slowly reaped, just in case the load spike comes again.
1168  */
1169  if (spare > thread_pool.max_spare_threads) {
1170 
1171  spare -= thread_pool.max_spare_threads;
1172 
1173  DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
1174 
1175  /*
1176  * Walk through the thread pool, deleting the
1177  * first idle thread we come across.
1178  */
1179  for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
1180  next = handle->next;
1181 
1182  /*
1183  * If the thread is not handling a
1184  * request, but still live, then tell it
1185  * to exit.
1186  *
1187  * It will eventually wake up, and realize
1188  * it's been told to commit suicide.
1189  */
1190  if ((handle->request == NULL) &&
1191  (handle->status == THREAD_RUNNING)) {
1192  handle->status = THREAD_CANCELLED;
1193  /*
1194  * Post an extra semaphore, as a
1195  * signal to wake up, and exit.
1196  */
1197  sem_post(&thread_pool.semaphore);
1198  spare--;
1199  break;
1200  }
1201  }
1202  }
1203 
1204  /*
1205  * Otherwise everything's kosher. There are not too few,
1206  * or too many spare threads. Exit happily.
1207  */
1208  return;
1209 }
1210 #endif /* WITH_GCD */
1211 
1212 #ifdef WNOHANG
1213 /*
1214  * Thread wrapper for fork().
1215  */
1216 pid_t rad_fork(void)
1217 {
1218  pid_t child_pid;
1219 
1220  if (!pool_initialized) return fork();
1221 
1222  reap_children(); /* be nice to non-wait thingies */
1223 
1224  if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1225  return -1;
1226  }
1227 
1228  /*
1229  * Fork & save the PID for later reaping.
1230  */
1231  child_pid = fork();
1232  if (child_pid > 0) {
1233  int rcode;
1234  thread_fork_t *tf;
1235 
1236  tf = rad_malloc(sizeof(*tf));
1237  memset(tf, 0, sizeof(*tf));
1238 
1239  tf->pid = child_pid;
1240 
1241  pthread_mutex_lock(&thread_pool.wait_mutex);
1242  rcode = fr_hash_table_insert(thread_pool.waiters, tf);
1243  pthread_mutex_unlock(&thread_pool.wait_mutex);
1244 
1245  if (!rcode) {
1246  ERROR("Failed to store PID, creating what will be a zombie process %d",
1247  (int) child_pid);
1248  free(tf);
1249  }
1250  }
1251 
1252  /*
1253  * Return whatever we were told.
1254  */
1255  return child_pid;
1256 }
1257 
1258 
1259 /*
1260  * Wait 10 seconds at most for a child to exit, then give up.
1261  */
1262 pid_t rad_waitpid(pid_t pid, int *status)
1263 {
1264  int i;
1265  thread_fork_t mytf, *tf;
1266 
1267  if (!pool_initialized) return waitpid(pid, status, 0);
1268 
1269  if (pid <= 0) return -1;
1270 
1271  mytf.pid = pid;
1272 
1273  pthread_mutex_lock(&thread_pool.wait_mutex);
1274  tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
1275  pthread_mutex_unlock(&thread_pool.wait_mutex);
1276 
1277  if (!tf) return -1;
1278 
1279  for (i = 0; i < 100; i++) {
1280  reap_children();
1281 
1282  if (tf->exited) {
1283  *status = tf->status;
1284 
1285  pthread_mutex_lock(&thread_pool.wait_mutex);
1286  fr_hash_table_delete(thread_pool.waiters, &mytf);
1287  pthread_mutex_unlock(&thread_pool.wait_mutex);
1288  return pid;
1289  }
1290  usleep(100000); /* sleep for 1/10 of a second */
1291  }
1292 
1293  /*
1294  * 10 seconds have passed, give up on the child.
1295  */
1296  pthread_mutex_lock(&thread_pool.wait_mutex);
1297  fr_hash_table_delete(thread_pool.waiters, &mytf);
1298  pthread_mutex_unlock(&thread_pool.wait_mutex);
1299 
1300  return 0;
1301 }
1302 #else
1303 /*
1304  * No rad_fork or rad_waitpid
1305  */
1306 #endif
1307 
1308 void thread_pool_queue_stats(int array[RAD_LISTEN_MAX], int pps[2])
1309 {
1310  int i;
1311 
1312 #ifndef WITH_GCD
1313  if (pool_initialized) {
1314  struct timeval now;
1315 
1316  /*
1317  * @fixme: the list of listeners is no longer
1318  * fixed in size.
1319  */
1320  memset(array, 0, sizeof(array[0]) * RAD_LISTEN_MAX);
1321  array[0] = fr_heap_num_elements(thread_pool.heap);
1322 
1323  gettimeofday(&now, NULL);
1324 
1325  pps[0] = rad_pps(&thread_pool.pps_in.pps_old,
1326  &thread_pool.pps_in.pps_now,
1327  &thread_pool.pps_in.time_old,
1328  &now);
1329  pps[1] = rad_pps(&thread_pool.pps_out.pps_old,
1330  &thread_pool.pps_out.pps_now,
1331  &thread_pool.pps_out.time_old,
1332  &now);
1333 
1334  } else
1335 #endif /* WITH_GCD */
1336  {
1337  for (i = 0; i < RAD_LISTEN_MAX; i++) {
1338  array[i] = 0;
1339  }
1340 
1341  pps[0] = pps[1] = 0;
1342  }
1343 }
1344 
1345 /** Return the maximum number of threads that can run concurrently
1346  *
1347  */
1348 uint32_t thread_pool_max_threads(void)
1349 {
1350  return thread_pool.max_threads;
1351 }
1352 #endif /* HAVE_PTHREAD_H */
RAD_LISTEN_TYPE priority
Definition: radiusd.h:276
#define pthread_mutex_init(_x, _y)
Definition: rlm_eap.h:75
rad_master_state_t master_state
Set by the master thread to signal the child that's currently working with the request, to do something.
Definition: radiusd.h:259
VALUE_PAIR * config
VALUE_PAIR (s) used to set per request parameters for modules and the server core at runtime...
Definition: radiusd.h:227
int fr_hash_table_insert(fr_hash_table_t *ht, void const *data)
Definition: hash.c:405
void * fr_hash_table_finddata(fr_hash_table_t *ht, void const *data)
Definition: hash.c:496
void exec_trigger(REQUEST *request, CONF_SECTION *cs, char const *name, bool quench) CC_HINT(nonnull(3))
Execute a trigger - call an executable to process an event.
Definition: exec.c:686
uint32_t fr_hash(void const *, size_t)
Definition: hash.c:727
void * rad_malloc(size_t size)
Definition: util.c:411
uint32_t fr_rand(void)
Return a 32-bit random number.
Definition: radius.c:1621
uint32_t thread_pool_max_threads(void)
#define VENDORPEC_FREERADIUS
Definition: radius.h:201
VALUE_PAIR * radius_pair_create(TALLOC_CTX *ctx, VALUE_PAIR **vps, unsigned int attribute, unsigned int vendor)
Create a VALUE_PAIR and add it to a list of VALUE_PAIR s.
Definition: pair.c:704
#define CONF_PARSER_TERMINATOR
Definition: conffile.h:289
#define rad_waitpid(a, b)
Definition: radiusd.h:564
#define VERIFY_REQUEST(_x)
Definition: radiusd.h:188
unsigned int number
Monotonically increasing request number. Reset on server restart.
Definition: radiusd.h:213
int fr_heap_extract(fr_heap_t *hp, void *data)
Definition: heap.c:147
fr_request_process_t process
The function to call to move the request through the state machine.
Definition: radiusd.h:244
Defines a CONF_PAIR to C data type mapping.
Definition: conffile.h:267
#define rad_fork(n)
Definition: radiusd.h:563
void * fr_heap_peek(fr_heap_t *hp)
Definition: heap.c:207
uint32_t magic
Magic number used to detect memory corruption, or request structs that have not been properly initial...
Definition: radiusd.h:210
RADIUS_PACKET * proxy
Outgoing request to proxy server.
Definition: radiusd.h:237
int(* fr_heap_cmp_t)(void const *, void const *)
Definition: heap.h:32
#define rad_assert(expr)
Definition: rad_assert.h:38
char const * fr_syserror(int num)
Guaranteed to be thread-safe version of strerror.
Definition: log.c:238
#define pthread_mutex_unlock(_x)
Definition: rlm_eap.h:78
int fr_heap_insert(fr_heap_t *hp, void *data)
Definition: heap.c:92
char const * component
Section the request is in.
Definition: radiusd.h:254
CONF_SECTION * cf_subsection_find_next(CONF_SECTION const *section, CONF_SECTION const *subsection, char const *name1)
Definition: conffile.c:3799
#define DEBUG2(fmt,...)
Definition: log.h:176
RFC2866 - Accounting-Request.
Definition: radius.h:95
int cf_section_parse(CONF_SECTION *, void *base, CONF_PARSER const *variables)
Parse a configuration section into user-supplied variables.
Definition: conffile.c:2234
Definition: heap.c:16
void thread_pool_stop(void)
unsigned int code
Packet code (type).
Definition: libradius.h:155
void fr_heap_delete(fr_heap_t *hp)
Definition: heap.c:36
Stores an attribute, a value and various bits of other data.
Definition: pair.h:112
A truth value.
Definition: radius.h:56
32 Bit unsigned integer.
Definition: radius.h:34
void fr_hash_table_free(fr_hash_table_t *ht)
Definition: hash.c:562
uint32_t rounds
for State[0]
Definition: libradius.h:165
uint8_t data[]
Definition: eap_pwd.h:625
static bool spawn_workers
Definition: process.c:50
struct timeval timestamp
When we started processing the request.
Definition: radiusd.h:214
void thread_pool_queue_stats(int array[RAD_LISTEN_MAX], int pps[2])
log_lvl_t rad_debug_lvl
Global debugging level.
Definition: log.c:49
int thread_pool_bootstrap(CONF_SECTION *cs, bool *spawn_workers)
fr_hash_table_t * fr_hash_table_create(TALLOC_CTX *ctx, fr_hash_table_hash_t hashNode, fr_hash_table_cmp_t cmpNode, fr_hash_table_free_t freeNode)
Definition: hash.c:279
RADIUS_PACKET * packet
Incoming request.
Definition: radiusd.h:221
int fr_hash_table_num_elements(fr_hash_table_t *ht)
Definition: hash.c:598
#define WARN(fmt,...)
Definition: log.h:144
#define RATE_LIMIT(_x)
Rate limit messages.
Definition: log.h:380
fr_heap_t * fr_heap_create(fr_heap_cmp_t cmp, size_t offset)
Definition: heap.c:44
#define pthread_mutex_lock(_x)
Definition: rlm_eap.h:77
String of printable characters.
Definition: radius.h:33
#define FR_CONF_POINTER(_n, _t, _p)
Definition: conffile.h:172
#define REQUEST_MAGIC
Definition: radiusd.h:45
#define RCSID(id)
Definition: build.h:135
char const * module
Module the request is currently being processed by.
Definition: radiusd.h:253
int fr_hash_table_delete(fr_hash_table_t *ht, void const *data)
Definition: hash.c:546
rad_child_state_t child_state
Definition: radiusd.h:261
uint32_t rad_pps(uint32_t *past, uint32_t *present, time_t *then, struct timeval *now)
Definition: util.c:608
#define fr_exit(_x)
Definition: libradius.h:508
int thread_pool_init(void)
#define ERROR(fmt,...)
Definition: log.h:145
#define USES_APPLE_DEPRECATED_API
Definition: build.h:122
size_t fr_heap_num_elements(fr_heap_t *hp)
Definition: heap.c:217