Categories: .Net, C# Posted by mheydt on 2/5/2008 5:01 PM | Comments (0)
I've been struggling again with the .NET built in threadpool class.  Fundamentally, it's easy enough to use (at least to me), but I just keep bumping up against limitations / problems that just drive me nuts. The latest issue, and one that I've seen before but one that I have experienced lately is the amount of time that it takes to sometimes spin up a thread to handle a request.  I haven't done an actual timing test to exactly quantify it, but I've seen times upwards of multiple seconds to get a thread going!

Another issue is that I just hate that I can't have control of the threads created to be able to do various things with them like abort them if needed as I have seen that sometimes a thread in a pool that doesn't terminate at the end of a WPF application will keep the executable running.  Sure, there are ways I should be able to get around this with proper coding (flags, events to stop, …), but it still leads to the fact that I think the built in class is just not adequate for many reasons. So, I did a little googling today and I found this reference which I thought was quite excellent and written by Marc Clifton: ThreadPools. In this article he discusses things he has discovered about the implementation and it includes the issues of wait/sleeps that cause latency in starting up threads, which are just the problem that I am having.

He also references Stephan Stoub's ManagedThreadPool class provided at this link and that doesn't exhibit these problems.  This is the only code I could find and it's part of the subtext project, so I'll just attached the code here.  I think I'll give this class a try and I'll let you know if it solves my problems, and if I extend it I'll surely republish it for everyone.



   1:  public static class ManagedThreadPool
   2:  {    
   3:      static Log Log = new Log();
   4:   
   5:      #region Constants
   6:      /// <summary>Maximum number of threads the thread pool has at its disposal.</summary>
   7:      private static int _maxWorkerThreads = 5;
   8:   
   9:      #endregion
  10:   
  11:      #region Member Variables
  12:      /// <summary>Queue of all the callbacks waiting to be executed.</summary>
  13:      static Queue _waitingCallbacks;
  14:      /// <summary>
  15:      /// Used to signal that a worker thread is needed for processing.  Note that multiple
  16:      /// threads may be needed simultaneously and as such we use a semaphore instead of
  17:      /// an auto reset event.
  18:      /// </summary>
  19:      static Semaphore _workerThreadNeeded;
  20:      /// <summary>List of all worker threads at the disposal of the thread pool.</summary>
  21:      static ArrayList _workerThreads;
  22:      /// <summary>Number of threads currently active.</summary>
  23:      static int _inUseThreads;
  24:      #endregion
  25:   
  26:      #region Construction
  27:      /// <summary>Initialize the thread pool.</summary>
  28:      static ManagedThreadPool()
  29:      {
  30:          try
  31:          {
  32:              _maxWorkerThreads = Subtext.Framework.Configuration.Config.Settings.QueuedThreads;
  33:          }
  34:          catch{}
  35:          // Create our thread stores; we handle synchronization ourself
  36:          // as we may run into situtations where multiple operations need to be atomic.
  37:          // We keep track of the threads we've created just for good measure; not actually
  38:          // needed for any core functionality.
  39:          _waitingCallbacks = new Queue();
  40:          _workerThreads = new ArrayList();
  41:   
  42:          // Create our "thread needed" event
  43:          _workerThreadNeeded = new Semaphore(0);
  44:          
  45:          // Create all of the worker threads
  46:          for(int i=0; i<_maxWorkerThreads; i++)
  47:          {
  48:              // Create a new thread and add it to the list of threads.
  49:              Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems));
  50:              _workerThreads.Add(newThread);
  51:   
  52:              // Configure the new thread and start it
  53:              newThread.Name = "ManagedPoolThread #" + i.ToString(CultureInfo.InvariantCulture);
  54:              newThread.IsBackground = true;
  55:              newThread.Start();
  56:          }
  57:      }
  58:      #endregion
  59:   
  60:      #region Public Methods
  61:      /// <summary>Queues a user work item to the thread pool.</summary>
  62:      /// <param name="callback">
  63:      /// A WaitCallback representing the delegate to invoke when the thread in the
  64:      /// thread pool picks up the work item.
  65:      /// </param>
  66:      public static void QueueUserWorkItem(WaitCallback callback)
  67:      {
  68:          // Queue the delegate with no state
  69:          QueueUserWorkItem(callback, null);
  70:      }
  71:   
  72:      /// <summary>Queues a user work item to the thread pool.</summary>
  73:      /// <param name="callback">
  74:      /// A WaitCallback representing the delegate to invoke when the thread in the
  75:      /// thread pool picks up the work item.
  76:      /// </param>
  77:      /// <param name="state">
  78:      /// The object that is passed to the delegate when serviced from the thread pool.
  79:      /// </param>
  80:      public static void QueueUserWorkItem(WaitCallback callback, object state)
  81:      {
  82:          // Create a waiting callback that contains the delegate and its state.
  83:          // At it to the processing queue, and signal that data is waiting.
  84:          WaitingCallback waiting = new WaitingCallback(callback, state);
  85:          using(TimedLock.Lock(_waitingCallbacks.SyncRoot)) { _waitingCallbacks.Enqueue(waiting); }
  86:          _workerThreadNeeded.AddOne();
  87:      }
  88:   
  89:      /// <summary>Empties the work queue of any queued work items.</summary>
  90:      public static void EmptyQueue()
  91:      {
  92:          using(TimedLock.Lock(_waitingCallbacks.SyncRoot))
  93:          {
  94:              try
  95:              {
  96:                  // Try to dispose of all remaining state
  97:                  foreach(object obj in _waitingCallbacks)
  98:                  {
  99:                      WaitingCallback callback = (WaitingCallback)obj;
 100:                      if (callback.State is IDisposable) ((IDisposable)callback.State).Dispose();
 101:                  }
 102:              }
 103:              catch
 104:              {
 105:                  // Make sure an error isn't thrown.
 106:              }
 107:   
 108:              // Clear all waiting items and reset the number of worker threads currently needed
 109:              // to be 0 (there is nothing for threads to do)
 110:              _waitingCallbacks.Clear();
 111:              _workerThreadNeeded.Reset(0);
 112:          }
 113:      }
 114:      #endregion
 115:   
 116:      #region Properties
 117:      /// <summary>Gets the number of threads at the disposal of the thread pool.</summary>
 118:      public static int MaxThreads { get { return _maxWorkerThreads; } }
 119:      /// <summary>Gets the number of currently active threads in the thread pool.</summary>
 120:      public static int ActiveThreads { get { return _inUseThreads; } }
 121:      /// <summary>Gets the number of callback delegates currently waiting in the thread pool.</summary>
 122:      public static int WaitingCallbacks { get { using(TimedLock.Lock(_waitingCallbacks.SyncRoot)) { return _waitingCallbacks.Count; } } }
 123:      #endregion
 124:   
 125:      #region Thread Processing
 126:      /// <summary>A thread worker function that processes items from the work queue.</summary>
 127:      private static void ProcessQueuedItems()
 128:      {
 129:          // Process indefinitely
 130:          while(true)
 131:          {
 132:              // Get the next item in the queue.  If there is nothing there, go to sleep
 133:              // for a while until we're woken up when a callback is waiting.
 134:              WaitingCallback callback = null;
 135:              while (callback == null)
 136:              {
 137:                  // Try to get the next callback available.  We need to lock on the
 138:                  // queue in order to make our count check and retrieval atomic.
 139:                  using(TimedLock.Lock(_waitingCallbacks.SyncRoot))
 140:                  {
 141:                      if (_waitingCallbacks.Count > 0)
 142:                      {
 143:                          try { callback = (WaitingCallback)_waitingCallbacks.Dequeue(); }
 144:                          catch{} // make sure not to fail here
 145:                      }
 146:                  }
 147:   
 148:                  // If we can't get one, go to sleep.
 149:                  if (callback == null) _workerThreadNeeded.WaitOne();
 150:              }
 151:   
 152:              // We now have a callback.  Execute it.  Make sure to accurately
 153:              // record how many callbacks are currently executing.
 154:              try
 155:              {
 156:                  Interlocked.Increment(ref _inUseThreads);
 157:                  callback.Callback(callback.State);
 158:              }
 159:              catch(SqlException)
 160:              {
 161:                  throw;
 162:              }
 163:              catch(Exception exc)
 164:              {
 165:                  // Make sure we don't throw here.
 166:                     try
 167:                     {
 168:                         Log.Error("Error while processing queued items.", exc);
 169:                     }
 170:                     catch
 171:                     {
 172:                         Log.Error("Unexpected exception while processing queued items.");
 173:                     }
 174:              }
 175:              finally
 176:              {
 177:                  Interlocked.Decrement(ref _inUseThreads);
 178:              }
 179:          }
 180:      }
 181:      #endregion
 182:   
 183:      /// <summary>Used to hold a callback delegate and the state for that delegate.</summary>
 184:      private class WaitingCallback
 185:      {
 186:          #region Member Variables
 187:          /// <summary>Callback delegate for the callback.</summary>
 188:          private WaitCallback _callback;
 189:          /// <summary>State with which to call the callback delegate.</summary>
 190:          private object _state;
 191:          #endregion
 192:   
 193:          #region Construction
 194:          /// <summary>Initialize the callback holding object.</summary>
 195:          /// <param name="callback">Callback delegate for the callback.</param>
 196:          /// <param name="state">State with which to call the callback delegate.</param>
 197:          public WaitingCallback(WaitCallback callback, object state)
 198:          {
 199:              _callback = callback;
 200:              _state = state;
 201:          }
 202:          #endregion
 203:   
 204:          #region Properties
 205:          /// <summary>Gets the callback delegate for the callback.</summary>
 206:          public WaitCallback Callback { get { return _callback; } }
 207:          /// <summary>Gets the state with which to call the callback delegate.</summary>
 208:          public object State { get { return _state; } }
 209:          #endregion
 210:      }
 211:  }

Comments