Thread Synchronization Between Worker Threads April 15, 2021
Posted by codinglifestyle in Architecture, C#, CodeProject, Parallelism.Tags: BackgroundWorker, ISynchronizeInvoke, Parallelism, synchronization, threads
add a comment
Many moons ago I wrote the last Windows service I’d ever need. It has a pluggable architecture and today there are over 20 modules doing all manner of business tasks such as sending emails, generating reports, performing automation, etc. Inside each plug-in module is a thread daemon which listens to events to orchestrate the operations of the other supporting module threads. This primarily consists of a thread which provides incoming data (what?), a thread which is the scheduler (when?), and a thread pool of worker threads which process said data (how?).
Recently a new plug-in was created to enforce global trade compliance. A bug crept up in the new worker thread initialization which surprised me because it caused the incoming data queue thread to die even though that code hadn’t changed in years. The reason was that the incoming queue fired the event and therefore it was the thread which executed the delegate (aka function pointer). This function is defined in the thread daemon class which takes care of initializing and starting a worker thread. From an error handling point of view this struck me as less than ideal. So the question was asked:
How do I get the thread daemon to be the thread which executes it’s own management functions which are triggered by events?
This got me thinking about WinForm apps whereby a worker thread must marshal execution of a delegate to the main UI thread to update the UI like changing the status bar. The pattern is to check if an invoke is required (am I the UI thread?) and if not then you use the built in SynchronizationObject to invoke the delegate in the UI thread.
public delegate void UpdateStatusEvent(string message, params object[] args);
public void UpdateStatus(string message, params object[] args)
{
if (_Status.InvokeRequired)
{
UpdateStatusEvent amc = new UpdateStatusEvent(UpdateStatus);
this.Invoke(amc, new object[] { message, args });
}
else
{
//Now we are the UI thread we can actually update the status bar
_LabelStatus.Text = string.Format(message, args);
_Status.Refresh();
}
}
With this pattern in my head I thought I’d be able to implement this quite easily between my background worker threads. How wrong I was and hence this article! WinForms and WPF provide a synchronization context but there isn’t something in the framework you get for free in my case. From what I understand they both implement a classic windows message pump which is used to marshal the execution from the worker thread to the UI thread. This will become important shortly, but suffice it to say after a lot of searching a pre-baked, easy solution wasn’t on offer.
As I continued down this path I searched for ISynchronizeInvoke and IAsyncResult which primarily returned questions and examples of the above pattern. I tried in vein to find an equivalent implementation except for worker threads. Surely it wasn’t that complicated to get an event initiated in Thread A to execute in Thread B? I tried using the SynchronizationContext class but quickly discovered it’s just a base class and didn’t do the work of marshalling the execution from Thread A to Thread B. So while I went through the motions the code still executed in the wrong thread context (so why isn’t this class abstract if you must extend it to get it to work?). BTW, the way I was testing to see which thread was running was to refer to Thread.CurrentThread.ManagedThreadId.
So now I had wasted a lot of time trying to find an easy answer and had to accept some work on my part would be necessary in this scenario. What struck me was the notion of the classic windows pump being crucial for the synchronization context. Basically that it has a message queue running so a thread can enqueue a message and that message then be executed by another thread when it is able. So in the thread daemon I defined a queue and in the OnDoWork created my rudimentary “message pump”.
private Queue<WaitCallback> _qWorkItems;
private object _oLock;
protected override void OnDoWork(DoWorkEventArgs e)
{
LogSvc.Debug(this, "Running");
while (!CancellationPending)
{
//Critical section
lock (_oLock)
{
//This is the message pump allowing for the thread synchronization context
while (_qWorkItems.Count > 0)
{
LogSvc.Debug(this, "Dequeue and invoke work item on thread {0}", Thread.CurrentThread.ManagedThreadId);
//Dequeue next work item
var workItem = _qWorkItems.Dequeue();
//Execute work item in thread daemon context
workItem.Invoke(null);
}
//Wait for new work items to process
Monitor.Wait(_oLock);
}
}
}
/// <summary>
/// Queues a method for execution. The method executes when the thread daemon is available.
/// </summary>
/// <param name="callback">The callback.</param>
public void QueueWorkItem(WaitCallback callback)
{
LogSvc.Debug(this, "Enqueuing work item from event caller thread {0}", Thread.CurrentThread.ManagedThreadId);
//Critical section
lock (_oLock)
{
_qWorkItems.Enqueue(callback);
Monitor.Pulse(_oLock);
}
}
I started with a Queue<Action> which allowed me to accomplish asynchronous execution. However I wanted to be able to support synchronous execution as well to support getting the return value from the delegate. So I looked at what ThreadPool.EnqueueUserWorkItem used and settled on WaitCallback.
Now we have our thread daemon setup to queue and execute operations in it’s thread context. What we need next is a synchronization context to allow the worker threads to marshal the delegate and data from their thread to the thread daemon thread. We’ll implement both ISynchronizeInvoke and IAsyncResult classes to nicely encapsulate this functionality. This will offer a test to see if an invoke is required and support both asynchronous and synchronous execution of the event delegate.
/// <summary>
/// Internally used by ThreadSynchronization to represent asynchronous operations
/// </summary>
/// <seealso cref="System.IAsyncResult" />
class ThreadAsyncResult : IAsyncResult
{
/// <summary>
/// Gets a value that indicates whether the asynchronous operation has completed.
/// </summary>
public bool IsCompleted { get; set; }
/// <summary>
/// Gets a <see cref="T:System.Threading.WaitHandle" /> that is used to wait for an asynchronous operation to complete.
/// </summary>
public WaitHandle AsyncWaitHandle { get; internal set; }
object _state;
/// <summary>
/// Gets a user-defined object that qualifies or contains information about an asynchronous operation.
/// </summary>
public object AsyncState
{
get
{
if (Exception != null)
{
throw Exception;
}
return _state;
}
internal set
{
_state = value;
}
}
/// <summary>
/// Gets a value that indicates whether the asynchronous operation completed synchronously.
/// </summary>
public bool CompletedSynchronously { get { return IsCompleted; } }
/// <summary>
/// Gets or sets the exception.
/// </summary>
/// <value>
/// The exception.
/// </value>
internal Exception Exception { get; set; }
}
/// <summary>
/// Thread synchronization context to marshal delegate and data to ThreadManager thread
/// </summary>
/// <seealso cref="System.ComponentModel.ISynchronizeInvoke" />
class ThreadSynchronization : ISynchronizeInvoke
{
public readonly int _nExecutingContextID = 0;
private ThreadManager _manager;
/// <summary>
/// Initializes a new instance of the <see cref="ThreadSynchronization"/> class.
/// </summary>
/// <param name="manager">The thread manager object</param>
public ThreadSynchronization(ThreadManager manager)
{
_nExecutingContextID = Thread.CurrentThread.ManagedThreadId;
_manager = manager;
Log.Debug("Synchronization context created for thread {0}", _nExecutingContextID);
}
/// <summary>
/// Gets a value indicating whether the caller must call <see cref="M:System.ComponentModel.ISynchronizeInvoke.Invoke(System.Delegate,System.Object[])" /> when calling an object that implements this interface.
/// </summary>
public bool InvokeRequired => Thread.CurrentThread.ManagedThreadId != _nExecutingContextID;
/// <summary>
/// Asynchronously executes the delegate on the thread that created this object.
/// </summary>
/// <param name="method">A <see cref="T:System.Delegate" /> to a method that takes parameters of the same number and type that are contained in <paramref name="args" />.</param>
/// <param name="args">An array of type <see cref="T:System.Object" /> to pass as arguments to the given method. This can be <see langword="null" /> if no arguments are needed.</param>
/// <returns>
/// An <see cref="T:System.IAsyncResult" /> interface that represents the asynchronous operation started by calling this method.
/// </returns>
public IAsyncResult BeginInvoke(Delegate method, object[] args)
{
var result = new ThreadAsyncResult();
var manualResetEvent = new ManualResetEvent(false);
result.AsyncWaitHandle = manualResetEvent;
_manager.QueueWorkItem(delegate
{
try
{
//Invoke the delegate and capture the return value
result.AsyncState = method.DynamicInvoke(args);
}
catch (Exception ex)
{
Log.Err(ex);
//Capture the exception
result.Exception = ex;
}
finally
{
//Mark complete
result.IsCompleted = true;
//Set event for anyone waiting
manualResetEvent.Set();
}
});
return result;
}
/// <summary>
/// Waits until the process started by calling <see cref="M:System.ComponentModel.ISynchronizeInvoke.BeginInvoke(System.Delegate,System.Object[])" /> completes, and then returns the value generated by the process.
/// </summary>
/// <param name="result">An <see cref="T:System.IAsyncResult" /> interface that represents the asynchronous operation started by calling <see cref="M:System.ComponentModel.ISynchronizeInvoke.BeginInvoke(System.Delegate,System.Object[])" />.</param>
/// <returns>
/// An <see cref="T:System.Object" /> that represents the return value generated by the asynchronous operation.
/// </returns>
public object EndInvoke(IAsyncResult result)
{
//If not complete then wait until done
if (!result.IsCompleted)
{
result.AsyncWaitHandle.WaitOne();
}
//The return value of the delegate
return result.AsyncState;
}
/// <summary>
/// Synchronously executes the delegate on the thread that created this object and marshals the call to the creating thread.
/// </summary>
/// <param name="method">A <see cref="T:System.Delegate" /> that contains a method to call, in the context of the thread for the control.</param>
/// <param name="args">An array of type <see cref="T:System.Object" /> that represents the arguments to pass to the given method. This can be <see langword="null" /> if no arguments are needed.</param>
/// <returns>
/// An <see cref="T:System.Object" /> that represents the return value from the delegate being invoked, or <see langword="null" /> if the delegate has no return value.
/// </returns>
public object Invoke(Delegate method, object[] args)
{
//Get IAsyncResult operation
var result = BeginInvoke(method, args);
//Wait for asynchronous operation to complete
EndInvoke(result);
//The return value of the delegate
return result.AsyncState;
}
}
So notice that ThreadSynchronization is tied to our thread daemon object which implements QueueWortItem. You could expose access to QueueWorkItem a different way if you wish. So, at long last, we have everything setup so we’re ready to alter the events themselves. These events, located within the thread daemon class, would have executed in another worker thread’s execution context. By instantiating the ThreadSynchronization object we can test if an invoke is required and enqueue the work to execute on the thread daemon thread and even get the return result of the event.
bool Incoming_Dequeued(object oData)
{
bool bReturn = false;
//If the event is called by a thread other than thread daemon...
if (Sync.InvokeRequired)
{
//Marshal delegate call and data to thread daemon context
var result = Sync.Invoke(new Func<object, bool>(Incoming_Dequeued), new object[] { oData });
bReturn = TypeParser.ParseBool(result, false);
return bReturn;
}
//Execute this code in the context of the thread daemon
return bReturn;
}
void ThreadPool_ThreadComplete(IModuleThread sender, object oResult)
{
//If the event is called by a thread other than thread daemon...
if (Sync.InvokeRequired)
{
//Marshal delegate call and data to thread daemon context
Sync.Invoke(new Action<IModuleThread, object>(ThreadPool_ThreadComplete), new object[] { sender, oResult });
return;
}
//Execute this code in the context of the thread daemon
}
At last here is the pattern I was looking for all too familiar to anyone who has worked on WinForms or WPF. Now we can easily see if we’re the correct thread and if not do an asynchronous or synchronous invoke of the delegate and data. When invoking it’s easy to use an Action<TIn1, TIn2> or Func<TIn1, Tin2, TOut>, as required, to generate your delegate.
In conclusion, you can see why Microsoft didn’t have a prepackaged solution to this as they couldn’t presume the executing thread would implement a message pump in a strict fashion. They did provide ISynchronizeInvoke which also needs IAsyncResult. Just creating these objects and implementing their interfaces lays bare what you need to do. While I love how it encapsulates this functionality in a familiar manner, it’s not strictly necessary. Really just the implementation of the message pump in our executing thread along with a composite object containing the delegate, data, and a lock would be enough to marshal the required pieces across and signal the thread who fired the event when execution is complete. However, if like me, you are hunting for a best practice implementation I’m very happy with how neatly the above solution turned out in the end.
Multi-threaded WebService: “Unable to connect to remote server” April 24, 2007
Posted by codinglifestyle in ASP.NET, IIS, Parallelism.Tags: IIS, Parallelism, ports, tcp/ip, threads, web service
2 comments
You know you’ve made it in to hackerdom when 4000 ports just isn’t enough. I need more power!
I have a webservice which spawns, potentially thousands, of threads. These in turn are calling a webservice in SharePoint to perform a real-time query (or else we could make a single call to the search service which relies on the index being up-to-date). I did think to include a throttle which would restrict the total number of threads spawned across all calls to the webmethod. However, even with this number safely at 100 threads I didn’t account for TCP/IP’s default settings keeping the port alive 4 minutes. It didn’t take long to put around 4000 ports in a TIME_WAIT state. When my webservice would make a call to the SharePoint webservice this would result in a System.Net.WebException:
“Unable to connect to remote server” with an inner exception of:
“Only one usage of each socket address (protocol/network address/port) is normally permitted”
The solution was simple enough and is common across a number of greedy applications. TCP/IP allows us to up the ante by changing a few parameters in the registry. This allows you to adjust the delay before a port is available again. In addition you may increase the number of ports at your disposal by nearly 60,000. If this isn’t enough, maybe a design change is in order!
Here’s how you do it:
The product handles heavy query volumes more efficiently if the following TCP/IP parameters are set in the Windows® registry:
1. Start the registry editor:
a. Click the Windows Start button.
b. Click Run.
c. Type regedit in field provided.
d. Click OK
2. Use the following directory path to navigate to the registry key:
HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters
3. In the right pane of the registry editor, look for the TcpTimedWaitDelay value name. If it is not there, add it by selecting EditNewDWORD Value from the menu bar. Type the value name TcpTimedWaitDelay in the name box that appears with the flashing cursor.
Note: If you do not see the a flashing cursor and New Value # inside the box, right-click inside the right panel and select Rename from the menu, then type the value name TcpTimedWaitDelay in the name box.
4. Double-click inside the right pane again to set the value of TcpTimedWaitDelay. Select Decimal as the Base, and enter 30 in the Value data field.
5. In the right pane of the registry editor, look for the MaxUserPort value name. If it is not there, add it by selecting EditNewDWORD Value from the menu bar. Type the value name MaxUserPort in the name box that appears with the flashing cursor.
Note: If you do not see the a flashing cursor and New Value # inside the box, right-click inside the right panel and select Rename from the menu, then type the value name TcpTimedWaitDelay in the name box.
6. Double-click inside the right pane again to set the value of MaxUserPort. Select Decimal as the Base, and enter 65534 in the Value data field.
7. You must restart Windows for these settings to take effect
Reference: http://blogs.msdn.com/dgorti/archive/2005/09/18/470766.aspx
Update!
After issuing the above changes were made and the customer finally rebooted the server I ran in to an new, exciting System.Net.WebException:
“The underlying connection was closed: An unexpected error occurred on a send.” with a status of: SendFailure
Searching for this exception lead me to exactly the information I was looking for. Let’s close down those ports as soon as we are done with them. I have no desire to use up thousands of ports and wear the hacker crown. We can do this by changing the keep alive state of our web request to false. Funny that this isn’t by default false for a webservice nor is it a public member to set like the timeout. We have two choices, the high road and the low road. First the low road:
We will alter generated proxy class directly; meaning your fix may be lost if you update your web reference. In the GetWebRequest function the KeepAlive property must be set to false. This can be accomplished by following these steps:
- Add a Web Reference using the normal way (if you haven’t already added one ofcourse).
- Make sure Show All Files menu item is enable in the Project menu.
- In the Solution Explorer window, navigate to:
- Web References
-
- Reference.map
- Reference.cs (or .vb)
- Reference.map
-
- Web References
- Open the Reference.cs file and add following code in the webservice proxy class:
-
protected override System.Net.WebRequest GetWebRequest(Uri uri)
{
System.Net.HttpWebRequest webRequest =
(System.Net.HttpWebRequest)base.GetWebRequest(uri);
webRequest.KeepAlive = false;
webRequest.ProtocolVersion = HttpVersion.Version10;
return webRequest;
}
-
The high road, or proper way of doing this, is to subclass our webservice such that we don’t touch the auto-generated proxy class. Here is a sample:
using System;
using System.Net;
using System.Reflection;// Web service reference entered in wizard was “MyWebService.MyWebServiceWse”
using MyProject.MyWebService;namespace MyProject
{
public class MySubClassedWebService : MyWebServiceWse
{
private static PropertyInfo requestPropertyInfo = null;public MySubClassedWebService(){}
protected override System.Net.WebRequest GetWebRequest(Uri uri)
{
WebRequest request = base.GetWebRequest(uri);// Retrieve property info and store it in a static member for optimizing future use
if (requestPropertyInfo==null)
requestPropertyInfo = request.GetType().GetProperty(“Request”);// Retrieve underlying web request
HttpWebRequest webRequest = (HttpWebRequest)requestPropertyInfo.GetValue(request, null);// Setting KeepAlive
webRequest.KeepAlive = false;
// Setting protocol version
webRequest.ProtocolVersion = HttpVersion.Version10;
return request;
}
}
}
Reference: http://weblogs.asp.net/jan/archive/2004/01/28/63771.aspx