by mheydt
5.
February 2008 17:01
>
I've been struggling again with the .NET built in threadpool class. Fundamentally, it's easy enough to use (at least to me), but I just keep bumping up against limitations / problems that just drive me nuts. The latest issue, and one that I've seen before but one that I have experienced lately is the amount of time that it takes to sometimes spin up a thread to handle a request. I haven't done an actual timing test to exactly quantify it, but I've seen times upwards of multiple seconds to get a thread going!
Another issue is that I just hate that I can't have control of the threads created to be able to do various things with them like abort them if needed as I have seen that sometimes a thread in a pool that doesn't terminate at the end of a WPF application will keep the executable running. Sure, there are ways I should be able to get around this with proper coding (flags, events to stop, …), but it still leads to the fact that I think the built in class is just not adequate for many reasons. So, I did a little googling today and I found this reference which I thought was quite excellent and written by Marc Clifton: ThreadPools. In this article he discusses things he has discovered about the implementation and it includes the issues of wait/sleeps that cause latency in starting up threads, which are just the problem that I am having.
He also references Stephan Stoub's ManagedThreadPool class provided at this link and that doesn't exhibit these problems. This is the only code I could find and it's part of the subtext project, so I'll just attached the code here. I think I'll give this class a try and I'll let you know if it solves my problems, and if I extend it I'll surely republish it for everyone.
1: public static class ManagedThreadPool
2: {
3: static Log Log = new Log();
4:
5: #region Constants
6: /// <summary>Maximum number of threads the thread pool has at its disposal.</summary>
7: private static int _maxWorkerThreads = 5;
8:
9: #endregion
10:
11: #region Member Variables
12: /// <summary>Queue of all the callbacks waiting to be executed.</summary>
13: static Queue _waitingCallbacks;
14: /// <summary>
15: /// Used to signal that a worker thread is needed for processing. Note that multiple
16: /// threads may be needed simultaneously and as such we use a semaphore instead of
17: /// an auto reset event.
18: /// </summary>
19: static Semaphore _workerThreadNeeded;
20: /// <summary>List of all worker threads at the disposal of the thread pool.</summary>
21: static ArrayList _workerThreads;
22: /// <summary>Number of threads currently active.</summary>
23: static int _inUseThreads;
24: #endregion
25:
26: #region Construction
27: /// <summary>Initialize the thread pool.</summary>
28: static ManagedThreadPool()
29: {
30: try
31: {
32: _maxWorkerThreads = Subtext.Framework.Configuration.Config.Settings.QueuedThreads;
33: }
34: catch{}
35: // Create our thread stores; we handle synchronization ourself
36: // as we may run into situtations where multiple operations need to be atomic.
37: // We keep track of the threads we've created just for good measure; not actually
38: // needed for any core functionality.
39: _waitingCallbacks = new Queue();
40: _workerThreads = new ArrayList();
41:
42: // Create our "thread needed" event
43: _workerThreadNeeded = new Semaphore(0);
44:
45: // Create all of the worker threads
46: for(int i=0; i<_maxWorkerThreads; i++)
47: {
48: // Create a new thread and add it to the list of threads.
49: Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems));
50: _workerThreads.Add(newThread);
51:
52: // Configure the new thread and start it
53: newThread.Name = "ManagedPoolThread #" + i.ToString(CultureInfo.InvariantCulture);
54: newThread.IsBackground = true;
55: newThread.Start();
56: }
57: }
58: #endregion
59:
60: #region Public Methods
61: /// <summary>Queues a user work item to the thread pool.</summary>
62: /// <param name="callback">
63: /// A WaitCallback representing the delegate to invoke when the thread in the
64: /// thread pool picks up the work item.
65: /// </param>
66: public static void QueueUserWorkItem(WaitCallback callback)
67: {
68: // Queue the delegate with no state
69: QueueUserWorkItem(callback, null);
70: }
71:
72: /// <summary>Queues a user work item to the thread pool.</summary>
73: /// <param name="callback">
74: /// A WaitCallback representing the delegate to invoke when the thread in the
75: /// thread pool picks up the work item.
76: /// </param>
77: /// <param name="state">
78: /// The object that is passed to the delegate when serviced from the thread pool.
79: /// </param>
80: public static void QueueUserWorkItem(WaitCallback callback, object state)
81: {
82: // Create a waiting callback that contains the delegate and its state.
83: // At it to the processing queue, and signal that data is waiting.
84: WaitingCallback waiting = new WaitingCallback(callback, state);
85: using(TimedLock.Lock(_waitingCallbacks.SyncRoot)) { _waitingCallbacks.Enqueue(waiting); }
86: _workerThreadNeeded.AddOne();
87: }
88:
89: /// <summary>Empties the work queue of any queued work items.</summary>
90: public static void EmptyQueue()
91: {
92: using(TimedLock.Lock(_waitingCallbacks.SyncRoot))
93: {
94: try
95: {
96: // Try to dispose of all remaining state
97: foreach(object obj in _waitingCallbacks)
98: {
99: WaitingCallback callback = (WaitingCallback)obj;
100: if (callback.State is IDisposable) ((IDisposable)callback.State).Dispose();
101: }
102: }
103: catch
104: {
105: // Make sure an error isn't thrown.
106: }
107:
108: // Clear all waiting items and reset the number of worker threads currently needed
109: // to be 0 (there is nothing for threads to do)
110: _waitingCallbacks.Clear();
111: _workerThreadNeeded.Reset(0);
112: }
113: }
114: #endregion
115:
116: #region Properties
117: /// <summary>Gets the number of threads at the disposal of the thread pool.</summary>
118: public static int MaxThreads { get { return _maxWorkerThreads; } }
119: /// <summary>Gets the number of currently active threads in the thread pool.</summary>
120: public static int ActiveThreads { get { return _inUseThreads; } }
121: /// <summary>Gets the number of callback delegates currently waiting in the thread pool.</summary>
122: public static int WaitingCallbacks { get { using(TimedLock.Lock(_waitingCallbacks.SyncRoot)) { return _waitingCallbacks.Count; } } }
123: #endregion
124:
125: #region Thread Processing
126: /// <summary>A thread worker function that processes items from the work queue.</summary>
127: private static void ProcessQueuedItems()
128: {
129: // Process indefinitely
130: while(true)
131: {
132: // Get the next item in the queue. If there is nothing there, go to sleep
133: // for a while until we're woken up when a callback is waiting.
134: WaitingCallback callback = null;
135: while (callback == null)
136: {
137: // Try to get the next callback available. We need to lock on the
138: // queue in order to make our count check and retrieval atomic.
139: using(TimedLock.Lock(_waitingCallbacks.SyncRoot))
140: {
141: if (_waitingCallbacks.Count > 0)
142: {
143: try { callback = (WaitingCallback)_waitingCallbacks.Dequeue(); }
144: catch{} // make sure not to fail here
145: }
146: }
147:
148: // If we can't get one, go to sleep.
149: if (callback == null) _workerThreadNeeded.WaitOne();
150: }
151:
152: // We now have a callback. Execute it. Make sure to accurately
153: // record how many callbacks are currently executing.
154: try
155: {
156: Interlocked.Increment(ref _inUseThreads);
157: callback.Callback(callback.State);
158: }
159: catch(SqlException)
160: {
161: throw;
162: }
163: catch(Exception exc)
164: {
165: // Make sure we don't throw here.
166: try
167: {
168: Log.Error("Error while processing queued items.", exc);
169: }
170: catch
171: {
172: Log.Error("Unexpected exception while processing queued items.");
173: }
174: }
175: finally
176: {
177: Interlocked.Decrement(ref _inUseThreads);
178: }
179: }
180: }
181: #endregion
182:
183: /// <summary>Used to hold a callback delegate and the state for that delegate.</summary>
184: private class WaitingCallback
185: {
186: #region Member Variables
187: /// <summary>Callback delegate for the callback.</summary>
188: private WaitCallback _callback;
189: /// <summary>State with which to call the callback delegate.</summary>
190: private object _state;
191: #endregion
192:
193: #region Construction
194: /// <summary>Initialize the callback holding object.</summary>
195: /// <param name="callback">Callback delegate for the callback.</param>
196: /// <param name="state">State with which to call the callback delegate.</param>
197: public WaitingCallback(WaitCallback callback, object state)
198: {
199: _callback = callback;
200: _state = state;
201: }
202: #endregion
203:
204: #region Properties
205: /// <summary>Gets the callback delegate for the callback.</summary>
206: public WaitCallback Callback { get { return _callback; } }
207: /// <summary>Gets the state with which to call the callback delegate.</summary>
208: public object State { get { return _state; } }
209: #endregion
210: }
211: }
79b2bcb1-d188-4c5e-bc77-fcd5b9ef3232|0|.0
Tags:
.Net | C#