Robust Daemon Monitoring Inbox with MailKit w/ OAuth2 September 22, 2022
Posted by codinglifestyle in C#, CodeProject, Parallelism.Tags: authentication, Daemon, Email, Exchange, IMAP, MailKit, MimeKit, O365, OAuth2
1 comment so far
It all started with an email sent to a daemon. A windows service hosting two modules, each of which monitors an inbox for automation, dutifully ignored warnings from IT that basic authentication for O365 would be switched off in several months. Months went by… Even though Microsoft’s announcement was 2 years ago by the time the right people were informed the deadline was just 2 months away. Sound familiar? Unfortunately the current IMAP API doesn’t support OAuth2 authentication so would have to be replaced. Even worse, we wasted weeks sorting out access with our Azure admin even though we had step-by-step instructions from day one.
Investigating mainstream IMAP APIs supporting OAuth2 turned up MailKit whose author was reassuringly active on Github and StackOverflow. We quickly discovered devs everywhere were addressing this issue and there was much debate on how, or even if, it could even be done (some of this doubt from the author himself). Thankfully after a painful couple of weeks, we were authenticating a daemon without user interaction (aka OAuth2 client credential grant flow).
When it comes to writing an API there is a spectrum in abstracting and obfuscating the inner workings from the user. On one hand, an API written to be 1:1 with the server is less usable but may give minute control and transparency allowing for better debugging. This path requires more ramp-up time and leaves more complexity to the user. On the other end of the spectrum, the API does some of the heavy lifting and aims to provide a usable, easy-to-use interface. A typical trade-off being the inner workings are a black box which might bite you in the ass down the road.
MailKit is wholeheartedly in the former camp compared with our old API. The old one connected, there was a new email event, then disconnect when the service shuts down. It was just overall easier to use from deleting a message to searching for new emails. For example, the email UID was part of the email object. With MailKit this information must be queried separately because technically that’s how it’s stored on the server. And this sets the tone for the entire experience of interacting with MailKit.
As mentioned above, even if a bit more difficult to use, it was very reassuring to see how active the author and user community are. While porting code from the old API required a lot of rewriting there was plenty of documentation, discussion, and examples out there to answer our questions. What was unexpected was that the server events did not work without building a full IMAP client, which reminds me of implementing a Windows message pump, to idle and process events in its own thread. Thankfully documentation and examples, although complicated, were available to build upon.
With the preamble out of the way, we can finally talk about the code. What follows is a C# wrapper for the MailKit API. We can use it in two different ways. You can simply instantiate it and execute a command with 2 lines of code. This will automatically connect, run the IMAP command within the IMAP client thread context, and then disconnect. Alternatively, you can use it as a long-running connection which will launch the IMAP client as a robust task which will stay connected until stopped. This allows the use of an event the wrapper exposes to process new messages. There is also a command queue so that code can be queued to run within the IMAP client thread context.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using MailKit;
using MailKit.Net.Imap;
using MailKit.Security;
using Microsoft.Identity.Client;
namespace Codinglifestyle
{
/// <summary>
/// IMAP client instance capable of receiving events and executing IMAP commands
/// </summary>
/// <seealso cref="System.IDisposable" />
public class ImapClientEx : IDisposable
{
#region Member variables
ImapClient _imapClient;
IMailFolder _imapFolder;
int _numMessages;
CancellationTokenSource _tokenCancel;
CancellationTokenSource _tokenDone;
Queue<OnImapCommand> _queueCommand;
bool _messagesArrived;
readonly string _imapServer;
readonly string _imapUser;
readonly string _authAppID;
readonly string _authAppSecret;
readonly string _authTenantID;
readonly SecureSocketOptions _sslOptions;
readonly int _port;
readonly FolderAccess _folderAccess;
protected DateTime _dtLastConnection;
readonly object _lock;
#endregion
#region Ctor
/// <summary>
/// Initializes a new instance of the <see cref="ImapClientEx"/> class.
/// </summary>
/// <param name="userEmail">The user email account.</param>
public ImapClientEx(string userEmail)
{
_queueCommand = new Queue<OnImapCommand>();
_numMessages = 0;
_lock = new object();
Config config = new Config("O365 Settings");
_authAppID = config["App ID"];
_authAppSecret = config.Decrypt("App Secret");
_authTenantID = config["Tenant ID"];
config = new Config("Mail Settings");
_imapServer = config["IMAP Server"];
_imapUser = userEmail;
_sslOptions = SecureSocketOptions.Auto;
_port = 993;
_folderAccess = FolderAccess.ReadWrite;
}
#endregion
#region Public Events
/// <summary>
/// IMAP command delegate to be queued and executed by the IMAP thread instance.
/// </summary>
/// <param name="imapClient">The IMAP client.</param>
/// <param name="imapFolder">The IMAP folder.</param>
public delegate void OnImapCommand(ImapClient imapClient, IMailFolder imapFolder);
/// <summary>
/// Event indicates the IMAP client folder has received a new message.
/// </summary>
/// <remarks>
/// The event is called by the IMAP thread instance.
/// </remarks>
public event OnImapCommand NewMessage;
/// <summary>
/// Fires the new message event.
/// </summary>
private void OnNewMessageEvent(ImapClient imapClient, IMailFolder imapFolder)
{
if (NewMessage != null)
NewMessage(_imapClient, _imapFolder);
}
#endregion
#region Public Methods
/// <summary>
/// Runs a IMAP client asynchronously.
/// </summary>
public async Task RunAsync()
{
try
{
//
//Queue first-run event to load new messages since last connection (the consumer must track this)
//
QueueCommand(OnNewMessageEvent);
//
//Run command in robustness pattern asynchronously to let this thread go...
//
await DoCommandAsync((_imapClient, _imapFolder) =>
{
//
//Run IMAP client async in IDLE to listen to events until Stop() is called
//
IdleAsync().Wait();
});
Log.Debug(Identifier + "IMAP client exiting normally.");
}
catch (OperationCanceledException)
{
//Token is cancelled so exit
Log.Debug(Identifier + "IMAP operation cancelled...");
}
catch (Exception ex)
{
Log.Err(ex, Identifier + "RunAsync");
}
finally
{
//
//Disconnect and close IMAP client
//
Dispose();
}
}
/// <summary>
/// Gets a value indicating whether this IMAP client instance is connected.
/// </summary>
public bool IsConnected => _imapClient?.IsConnected == true && _imapFolder?.IsOpen == true;
/// <summary>
/// Identifiers this instance for logging.
/// </summary>
public string Identifier => string.Format("IMAP {0} [{1}]: ", _imapUser, Thread.CurrentThread.ManagedThreadId);
/// <summary>
/// Stops this IMAP client instance.
/// </summary>
public void Stop()
{
//Cancel the tokens releasing the IMAP client thread
_tokenDone?.Cancel();
_tokenCancel?.Cancel();
}
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <remarks>This is safe to call and then carry on using this instance as all the resources will be automatically recreated by error handling</remarks>
public void Dispose()
{
//Cancel tokens
Stop();
//Release connection
DisconnectAsync().Wait();
//Release resources
if (_imapFolder != null)
{
_imapFolder.MessageExpunged -= OnMessageExpunged;
_imapFolder.CountChanged -= OnCountChanged;
}
_imapFolder = null;
_imapClient?.Dispose();
_imapClient = null;
_tokenCancel?.Dispose();
_tokenCancel = null;
_tokenDone?.Dispose();
_tokenDone = null;
}
#endregion
#region IMAP Connect / Idle
/// <summary>
/// Connects IMAP client, authenticated with OAUTH2, and opens the Inbox folder asynchronously.
/// </summary>
private async Task ConnectAsync()
{
//Dispose of existing instance, if any.
if (_imapClient != null)
Dispose();
//
//Create IMAP client
//
_imapClient = new ImapClient();
//
//Create a new cancellation token
//
_tokenCancel = new CancellationTokenSource();
//
//Connect to the server
//
Log.Debug(Identifier + "Connecting to IMAP server: " + _imapServer);
if (!_imapClient.IsConnected)
await _imapClient.ConnectAsync(_imapServer, _port, _sslOptions, _tokenCancel.Token);
//
//Authenticate
//
if (!_imapClient.IsAuthenticated)
{
//
//Create the client application
//
var app = ConfidentialClientApplicationBuilder
.Create(_authAppID)
.WithClientSecret(_authAppSecret)
.WithAuthority(new System.Uri($"https://login.microsoftonline.com/{_authTenantID}"))
.Build();
//
//Get the OAUTH2 token
//
var scopes = new string[] { "https://outlook.office365.com/.default" };
var authToken = await app.AcquireTokenForClient(scopes).ExecuteAsync();
Log.Debug(Identifier + "Creating OAUTH2 tokent for {0}: {1}", _imapUser, authToken.AccessToken);
var oauth2 = new SaslMechanismOAuth2(_imapUser, authToken.AccessToken);
//
//Authenticate
//
Log.Debug(Identifier + "Authenticating user: " + _imapUser);
await _imapClient.AuthenticateAsync(oauth2, _tokenCancel.Token);
}
//
//Open inbox
//
if (!_imapClient.Inbox.IsOpen)
await _imapClient.Inbox.OpenAsync(_folderAccess, _tokenCancel.Token);
// Note: We capture client.Inbox here because cancelling IdleAsync() *may* require
// disconnecting the IMAP client connection, and, if it does, the `client.Inbox`
// property will no longer be accessible which means we won't be able to disconnect
// our event handlers.
_imapFolder = _imapClient.Inbox;
//
//Track changes to the number of messages in the folder (this is how we'll tell if new messages have arrived).
_imapFolder.CountChanged += OnCountChanged;
//Track of messages being expunged to track messages removed to work in combination with the above event.
_imapFolder.MessageExpunged += OnMessageExpunged;
//Track the message count to determine when we have new messages.
_numMessages = _imapFolder.Count;
}
/// <summary>
/// Closes the folder and disconnects IMAP client asynchronously.
/// </summary>
private async Task DisconnectAsync()
{
try
{
//Disconnect IMAP client
if (_imapClient?.IsConnected == true)
await _imapClient.DisconnectAsync(true);
Log.Debug(Identifier + "Disconnected.");
}
catch (Exception)
{
}
}
/// <summary>
/// Idles waiting for events or commands to execute asynchronously.
/// </summary>
private async Task IdleAsync()
{
do
{
try
{
//
//Run all queued IMAP commands
//
await DoCommandsAsync();
//
//Idle and listen for messages
//
await WaitForNewMessages();
//
if (_messagesArrived)
{
Log.Debug(Identifier + "New message arrived. Queueing new message event...");
//
QueueCommand(OnNewMessageEvent);
//
_messagesArrived = false;
}
}
catch (OperationCanceledException)
{
//Token is cancelled so exit
Log.Debug(Identifier + "IMAP Idle stopping...");
break;
}
} while (_tokenCancel != null && !_tokenCancel.IsCancellationRequested);
}
/// <summary>
/// Waits for server events or cancellation tokens asynchronously.
/// </summary>
private async Task WaitForNewMessages()
{
try
{
Log.Debug(Identifier + "IMAP idle for 1 minute. Connection age: {0}", DateTime.Now - _dtLastConnection);
if (_imapClient.Capabilities.HasFlag(ImapCapabilities.Idle))
{
//Done token will self-desrtruct in specified time (1 min)
_tokenDone = new CancellationTokenSource(new TimeSpan(0, 1, 0));
//
//Idle waiting for new events...
//Note: My observation was that the events fired but only after the 1 min token expired
//
await _imapClient.IdleAsync(_tokenDone.Token, _tokenCancel.Token);
}
else
{
//Wait for 1 min
await Task.Delay(new TimeSpan(0, 1, 0), _tokenCancel.Token);
//Ping the IMAP server to keep the connection alive
await _imapClient.NoOpAsync(_tokenCancel.Token);
}
}
catch (OperationCanceledException)
{
Log.Debug(Identifier + "WaitForNewMessages Idle cancelled...");
throw;
}
catch (Exception ex)
{
Log.Warn(ex, Identifier + "WaitForNewMessages errored out...");
throw;
}
finally
{
_tokenDone?.Dispose();
_tokenDone = null;
}
}
#endregion
#region Command Queue
/// <summary>
/// Connects and performs IMAP command asynchronously.
/// </summary>
/// <param name="command">The IMAP comannd to execute.</param>
/// <param name="retries">The number of times to retry executing the command.</param>
/// <returns>Return true if the command succesfully updated</returns>
/// <exception cref="MailKit.ServiceNotConnectedException">Will enter robustness pattern if not connected and retry later</exception>
public async Task<bool> DoCommandAsync(OnImapCommand command, int retries = -1)
{
int attempts = 1;
int errors = 0;
int connections = 0;
_dtLastConnection = DateTime.Now;
DateTime errorStart = DateTime.Now;
bool bReturn = false;
//Enter robustness pattern do/while loop...
do
{
try
{
//
//Connect, if not already connected
//
if (!IsConnected)
{
Log.Debug(Identifier + "Connection attempt #{0}; retries: {1}; errors: {2}; conns: {3}; total age: {4})",
attempts++,
(retries-- < 0) ? "infinite" : retries.ToString(),
errors,
connections,
DateTime.Now - _dtLastConnection);
//
//Connect to IMAP
//
await ConnectAsync();
//Test IMAP connection
if (!IsConnected)
throw new ServiceNotConnectedException();
Log.Debug($"{Identifier}Server Connection: {IsConnected}");
//Reset connection stats
attempts = 1;
errors = 0;
_dtLastConnection = DateTime.Now;
connections++;
}
//
//Perform command
//
Log.Debug("{0}Run IMAP command: {1}", Identifier, command.Method);
await Task.Run(() => command(_imapClient, _imapFolder), _tokenCancel.Token);
//
//Success: break the do/while loop and exit
//
Log.Debug(Identifier + "Command completed successfully.");
bReturn = true;
break;
}
catch (OperationCanceledException)
{
//Token is cancelled so break the do/while loop and exit
Log.Debug(Identifier + "Command operation cancelled...");
break;
}
catch (Exception ex)
{
//If no reries left log the error
if (retries == 0 && IsConnected)
Log.Err(ex, "{0}Error IMAP command: {1}", Identifier, command.Method);
//If first error since connected...
if (errors++ == 0)
{
//Track time since first error
errorStart = DateTime.Now;
//Reset the IMAP connection
Log.Debug(Identifier + "Error detected - attempt immediate reconnection.");
await DisconnectAsync();
}
else
{
TimeSpan errorAge = (DateTime.Now - errorStart);
Log.Debug(Identifier + "Connect failure (attempting connection for {0})", errorAge);
//Wait and try to reconnect
if (errorAge.TotalMinutes < 10)
{
Log.Debug(Identifier + "Cannot connect. Retry in 1 minute.");
await Task.Delay(new TimeSpan(0, 1, 0), _tokenCancel.Token);
}
else if (errorAge.TotalMinutes < 60)
{
Log.Info(Identifier + "Cannot connect. Retry in 10 minutes.");
await Task.Delay(new TimeSpan(0, 10, 0), _tokenCancel.Token);
}
else
{
Log.Err(ex, Identifier + "Cannot connect. Retry in 1 hour (total errors: {0}).", errors);
await Task.Delay(new TimeSpan(1, 0, 0), _tokenCancel.Token);
}
}
}
} while (retries != 0 && _tokenCancel != null && !_tokenCancel.IsCancellationRequested);
//
//Return true if the command succesfully updated
//
return bReturn;
}
/// <summary>
/// Execute the IMAP commands in the queue asynchronously.
/// </summary>
/// <param name="retries">The number of times to retry executing the command.</param>
/// <returns>True if all commands in the queue are executed successfully.</returns>
/// <remarks>Command retries do not apply to the queue which will run idefinitely until empty or cancelled</remarks>
public async Task<bool> DoCommandsAsync(int retries = -1)
{
while (_queueCommand.Count > 0 && _tokenCancel != null && !_tokenCancel.IsCancellationRequested)
{
try
{
//Peek in the command queue for the next command
var command = _queueCommand.Peek();
//
//Execute the Imap command
//
if (await DoCommandAsync(command, retries))
{
//If successful, dequeue and discard the command
lock (_lock)
_queueCommand.Dequeue();
}
//Reset if the command affects folder state
if (_imapClient.IsConnected && !_imapFolder.IsOpen)
_imapFolder.Open(_folderAccess);
}
catch (Exception ex)
{
//We may be disconnected, throw to try again
Log.Warn(ex, Identifier + "DoCommands errored out...");
throw;
}
}
return _queueCommand.Count == 0;
}
/// <summary>
/// Queues a command to be executed by the IMAP client instance.
/// </summary>
/// <param name="command">The command to execute in the IMAP thread.</param>
public void QueueCommand(OnImapCommand command)
{
lock (_lock)
_queueCommand.Enqueue(command);
//If idling, wake up and process the command queue
_tokenDone?.Cancel();
}
#endregion
#region IMAP Events
/// <summary>
/// Called when folder message count changes.
/// </summary>
/// <param name="sender">The sender.</param>
/// <param name="e">The <see cref="EventArgs"/> instance containing the event data.</param>
/// <remarks>CountChanged event will fire when new messages arrive in the folder and/or when messages are expunged.</remarks>
private void OnCountChanged(object sender, EventArgs e)
{
var folder = (ImapFolder)sender;
Log.Debug(Identifier + "{0} message count has changed from {1} to {2}.", folder, _numMessages, folder.Count);
//If the folder count is more than our tracked number of messages flag and cancel IDLE
if (folder.Count > _numMessages)
{
Log.Debug(Identifier + "{0} new messages have arrived.", folder.Count - _numMessages);
// Note: This event is called by the ImapFolder (the ImapFolder is not re-entrant).
// IMAP commands cannot be performed here so instead flag new messages and
// cancel the `done` token to handle new messages in IdleAsync.
_messagesArrived = true;
_tokenDone?.Cancel();
}
//
//Track the message count to determine when we have new messages.
//
_numMessages = folder.Count;
}
/// <summary>
/// Called when a message is expunged (deleted or moved).
/// </summary>
/// <param name="sender">The sender.</param>
/// <param name="e">The <see cref="MessageEventArgs"/> instance containing the event data.</param>
private void OnMessageExpunged(object sender, MessageEventArgs e)
{
var folder = (ImapFolder)sender;
Log.Debug(Identifier + "{0} message #{1} has been expunged.", folder, e.Index);
//
//Track the message count to determine when we have new messages.
//
_numMessages = folder.Count;
}
#endregion
}
}
It is worth studying the “robustness pattern” in the DoCommandAsync code. The most likely reason for an exception to be thrown, provided your own code is well written with error handling, is due to a problem with the server connection. This pattern is meant to allow a daemon to reestablish a connection even if it takes hours to do so. The idea is that on the first error it will immediately reconnect and try again. If there is still a connection issue it will wait 1 minute between retries, then 10 minutes, and then ultimately wait an hour before trying to reconnect and run the command. There is also a way of retrying indefinitely or for a specified number of retries.
It should also be noted that, as in the author’s example, there are two cancellation tokens being used. These can be accessed via the wrapper by calling Stop or Disposing the wrapper instance. When a command is queued we’ll wake up if idling. When a server event is received we should do likewise.
First, let’s demonstrate the simple case of connecting and running an IMAP command (such as deleting an email, searching for or fetching details, or moving a message, etc).
//Connect, perform command, and disconnect synchronously
using (var imapClient = new ImapClientEx(_imapUser))
{
//The IMAP client will run the command async so we must Wait to ensure the connection does not close before the command is run
imapClient.DoCommandAsync(MoveEmail, 5).Wait();
}
Notice the using statement for scoping the ImapClientEx wrapper. This code is being executed by its own thread, when the command is run this is done in the IMAP client thread, and the pointer to the function is shunted over from one thread to the other. The wrapper will automatically connect before running the command. While async is supported, in this case, we will wait otherwise we will dispose of our IMAP connection too soon.
private void MoveEmail(ImapClient imapClient, IMailFolder imapFolder)
{
//Perform an action with the connected imapClient or the opened imapFolder
}
The command queue takes a delegate with parameters for the MailKit client and folder arguments. It is run by the IMAP client wrapper thread but it is the instance of your object so you have full access to member variables, etc. Again, this is a simple use case but shows how easily the client can connect and run code.
Now let’s move on to the use case where you want to have a long-standing connection to an inbox to monitor new messages. This requires asynchronously launching and storing the IMAP client wrapper. As the client is running it will remain connected and monitor two events as per the author’s example: inbox.CountChanged and inbox.MessageExpunged. By handling these the wrapper can expose its single event: NewMessage. With the IMAP client running, all we have to do is keep the instance in a member variable to queue additional IMAP commands, receive the NewMessage event, or stop the client when we are done.
protected void ImapConnect()
{
// Dispose of existing instance, if any.
if (_imapClient != null)
{
_imapClient.NewMessage -= IMAPProcessMessages;
_imapClient.Stop();
_imapClient = null;
}
_imapClient = new ImapClientEx(_imapUser);
_imapClient.NewMessage += IMAPProcessMessages;
var idleTask = _imapClient.RunAsync();
_dtLastConnection = DateTime.Now;
}
Now it should be noted that a NewMessage event will fire once the IMAP client connects at startup. This is because our daemon needs to be capable of shutting down and therefore must track the last processed message. The best way to do this is to track the last UID processed. This way, whenever the event is fired, you will just search for new UIDs since the last tracked UID was seen.
private void IMAPProcessMessages(ImapClient imapClient, IMailFolder imapFolder)
{
LogSvc.Debug(this, "IMAP: Checking emails...");
_dtLastConnection = DateTime.Now;
//
//Retrieve last index from DB
//
if (_currentUid == 0)
_currentUid = (uint)TaskEmailData.FetchLastUID(_taskType);
LogSvc.Debug(this, "IMAP: Last email index from DB: " + _currentUid.ToString());
//
//Process messages since last processed UID
//
int currentIndex = imapFolder.Count - 1;
if (currentIndex >= 0)
{
//
//Create range from the current UID to the max
//
var range = new UniqueIdRange(new UniqueId((uint)_currentUid + 1), UniqueId.MaxValue);
//
//Get the UIDs newer than the current UID
//
var uids = imapFolder.Search(range, SearchQuery.All);
//
if (uids.Count > 0)
{
LogSvc.Info(this, "IMAP: Processing {0} missed emails.", uids.Count);
foreach (var uid in uids)
{
//
//Get the email
//
var email = imapFolder.GetMessage(uid);
//
//Process and enqueue new message
//
ImapProcessMessage(imapClient, imapFolder, uid, email);
}
//
//Pulse the lock to process new tasks...
//
Pulse();
}
else
{
LogSvc.Debug(this, "IMAP: No missed emails.");
}
}
else
{
LogSvc.Debug(this, "IMAP: No missed emails.");
}
}
I won’t show you but, suffice it to say, I have one extra level of redundancy in my daemon where it tracks the connection age and simply recycles the connection after a specified amount of time of inactivity. This was done because, while it was more usable, our old IMAP API became disconnected quite regularly although it falsely reported it was still connected.
Lastly, when the daemon is being shut down for any reason, we need to Stop or Dispose to disconnect and clean up the IMAP connection. The Stop will trigger the cancellation tokens such that the IMAP task thread shuts down in its own time. Calling Dispose directly will synchronously do the same. Also, Dispose can be called repeatedly on the wrapper instance and still be safe to use as it will reconnect as necessary.
_imapClient?.Dispose();
This took a couple of weeks to write and test. My boss was cool about it being shared so I hope to save someone else the pain of writing this from scratch. While MailKit may be on the basic end of the spectrum we built a solution that is very robust and will no doubt have a better up-time metric than before. Many thanks to the author and the MailKit user community for all the insight and knowledge necessary to have written this.
Thread Synchronization Between Worker Threads April 15, 2021
Posted by codinglifestyle in Architecture, C#, CodeProject, Parallelism.Tags: BackgroundWorker, ISynchronizeInvoke, Parallelism, synchronization, threads
add a comment
Many moons ago I wrote the last Windows service I’d ever need. It has a pluggable architecture and today there are over 20 modules doing all manner of business tasks such as sending emails, generating reports, performing automation, etc. Inside each plug-in module is a thread daemon which listens to events to orchestrate the operations of the other supporting module threads. This primarily consists of a thread which provides incoming data (what?), a thread which is the scheduler (when?), and a thread pool of worker threads which process said data (how?).
Recently a new plug-in was created to enforce global trade compliance. A bug crept up in the new worker thread initialization which surprised me because it caused the incoming data queue thread to die even though that code hadn’t changed in years. The reason was that the incoming queue fired the event and therefore it was the thread which executed the delegate (aka function pointer). This function is defined in the thread daemon class which takes care of initializing and starting a worker thread. From an error handling point of view this struck me as less than ideal. So the question was asked:
How do I get the thread daemon to be the thread which executes it’s own management functions which are triggered by events?
This got me thinking about WinForm apps whereby a worker thread must marshal execution of a delegate to the main UI thread to update the UI like changing the status bar. The pattern is to check if an invoke is required (am I the UI thread?) and if not then you use the built in SynchronizationObject to invoke the delegate in the UI thread.
public delegate void UpdateStatusEvent(string message, params object[] args);
public void UpdateStatus(string message, params object[] args)
{
if (_Status.InvokeRequired)
{
UpdateStatusEvent amc = new UpdateStatusEvent(UpdateStatus);
this.Invoke(amc, new object[] { message, args });
}
else
{
//Now we are the UI thread we can actually update the status bar
_LabelStatus.Text = string.Format(message, args);
_Status.Refresh();
}
}
With this pattern in my head I thought I’d be able to implement this quite easily between my background worker threads. How wrong I was and hence this article! WinForms and WPF provide a synchronization context but there isn’t something in the framework you get for free in my case. From what I understand they both implement a classic windows message pump which is used to marshal the execution from the worker thread to the UI thread. This will become important shortly, but suffice it to say after a lot of searching a pre-baked, easy solution wasn’t on offer.
As I continued down this path I searched for ISynchronizeInvoke and IAsyncResult which primarily returned questions and examples of the above pattern. I tried in vein to find an equivalent implementation except for worker threads. Surely it wasn’t that complicated to get an event initiated in Thread A to execute in Thread B? I tried using the SynchronizationContext class but quickly discovered it’s just a base class and didn’t do the work of marshalling the execution from Thread A to Thread B. So while I went through the motions the code still executed in the wrong thread context (so why isn’t this class abstract if you must extend it to get it to work?). BTW, the way I was testing to see which thread was running was to refer to Thread.CurrentThread.ManagedThreadId.
So now I had wasted a lot of time trying to find an easy answer and had to accept some work on my part would be necessary in this scenario. What struck me was the notion of the classic windows pump being crucial for the synchronization context. Basically that it has a message queue running so a thread can enqueue a message and that message then be executed by another thread when it is able. So in the thread daemon I defined a queue and in the OnDoWork created my rudimentary “message pump”.
private Queue<WaitCallback> _qWorkItems;
private object _oLock;
protected override void OnDoWork(DoWorkEventArgs e)
{
LogSvc.Debug(this, "Running");
while (!CancellationPending)
{
//Critical section
lock (_oLock)
{
//This is the message pump allowing for the thread synchronization context
while (_qWorkItems.Count > 0)
{
LogSvc.Debug(this, "Dequeue and invoke work item on thread {0}", Thread.CurrentThread.ManagedThreadId);
//Dequeue next work item
var workItem = _qWorkItems.Dequeue();
//Execute work item in thread daemon context
workItem.Invoke(null);
}
//Wait for new work items to process
Monitor.Wait(_oLock);
}
}
}
/// <summary>
/// Queues a method for execution. The method executes when the thread daemon is available.
/// </summary>
/// <param name="callback">The callback.</param>
public void QueueWorkItem(WaitCallback callback)
{
LogSvc.Debug(this, "Enqueuing work item from event caller thread {0}", Thread.CurrentThread.ManagedThreadId);
//Critical section
lock (_oLock)
{
_qWorkItems.Enqueue(callback);
Monitor.Pulse(_oLock);
}
}
I started with a Queue<Action> which allowed me to accomplish asynchronous execution. However I wanted to be able to support synchronous execution as well to support getting the return value from the delegate. So I looked at what ThreadPool.EnqueueUserWorkItem used and settled on WaitCallback.
Now we have our thread daemon setup to queue and execute operations in it’s thread context. What we need next is a synchronization context to allow the worker threads to marshal the delegate and data from their thread to the thread daemon thread. We’ll implement both ISynchronizeInvoke and IAsyncResult classes to nicely encapsulate this functionality. This will offer a test to see if an invoke is required and support both asynchronous and synchronous execution of the event delegate.
/// <summary>
/// Internally used by ThreadSynchronization to represent asynchronous operations
/// </summary>
/// <seealso cref="System.IAsyncResult" />
class ThreadAsyncResult : IAsyncResult
{
/// <summary>
/// Gets a value that indicates whether the asynchronous operation has completed.
/// </summary>
public bool IsCompleted { get; set; }
/// <summary>
/// Gets a <see cref="T:System.Threading.WaitHandle" /> that is used to wait for an asynchronous operation to complete.
/// </summary>
public WaitHandle AsyncWaitHandle { get; internal set; }
object _state;
/// <summary>
/// Gets a user-defined object that qualifies or contains information about an asynchronous operation.
/// </summary>
public object AsyncState
{
get
{
if (Exception != null)
{
throw Exception;
}
return _state;
}
internal set
{
_state = value;
}
}
/// <summary>
/// Gets a value that indicates whether the asynchronous operation completed synchronously.
/// </summary>
public bool CompletedSynchronously { get { return IsCompleted; } }
/// <summary>
/// Gets or sets the exception.
/// </summary>
/// <value>
/// The exception.
/// </value>
internal Exception Exception { get; set; }
}
/// <summary>
/// Thread synchronization context to marshal delegate and data to ThreadManager thread
/// </summary>
/// <seealso cref="System.ComponentModel.ISynchronizeInvoke" />
class ThreadSynchronization : ISynchronizeInvoke
{
public readonly int _nExecutingContextID = 0;
private ThreadManager _manager;
/// <summary>
/// Initializes a new instance of the <see cref="ThreadSynchronization"/> class.
/// </summary>
/// <param name="manager">The thread manager object</param>
public ThreadSynchronization(ThreadManager manager)
{
_nExecutingContextID = Thread.CurrentThread.ManagedThreadId;
_manager = manager;
Log.Debug("Synchronization context created for thread {0}", _nExecutingContextID);
}
/// <summary>
/// Gets a value indicating whether the caller must call <see cref="M:System.ComponentModel.ISynchronizeInvoke.Invoke(System.Delegate,System.Object[])" /> when calling an object that implements this interface.
/// </summary>
public bool InvokeRequired => Thread.CurrentThread.ManagedThreadId != _nExecutingContextID;
/// <summary>
/// Asynchronously executes the delegate on the thread that created this object.
/// </summary>
/// <param name="method">A <see cref="T:System.Delegate" /> to a method that takes parameters of the same number and type that are contained in <paramref name="args" />.</param>
/// <param name="args">An array of type <see cref="T:System.Object" /> to pass as arguments to the given method. This can be <see langword="null" /> if no arguments are needed.</param>
/// <returns>
/// An <see cref="T:System.IAsyncResult" /> interface that represents the asynchronous operation started by calling this method.
/// </returns>
public IAsyncResult BeginInvoke(Delegate method, object[] args)
{
var result = new ThreadAsyncResult();
var manualResetEvent = new ManualResetEvent(false);
result.AsyncWaitHandle = manualResetEvent;
_manager.QueueWorkItem(delegate
{
try
{
//Invoke the delegate and capture the return value
result.AsyncState = method.DynamicInvoke(args);
}
catch (Exception ex)
{
Log.Err(ex);
//Capture the exception
result.Exception = ex;
}
finally
{
//Mark complete
result.IsCompleted = true;
//Set event for anyone waiting
manualResetEvent.Set();
}
});
return result;
}
/// <summary>
/// Waits until the process started by calling <see cref="M:System.ComponentModel.ISynchronizeInvoke.BeginInvoke(System.Delegate,System.Object[])" /> completes, and then returns the value generated by the process.
/// </summary>
/// <param name="result">An <see cref="T:System.IAsyncResult" /> interface that represents the asynchronous operation started by calling <see cref="M:System.ComponentModel.ISynchronizeInvoke.BeginInvoke(System.Delegate,System.Object[])" />.</param>
/// <returns>
/// An <see cref="T:System.Object" /> that represents the return value generated by the asynchronous operation.
/// </returns>
public object EndInvoke(IAsyncResult result)
{
//If not complete then wait until done
if (!result.IsCompleted)
{
result.AsyncWaitHandle.WaitOne();
}
//The return value of the delegate
return result.AsyncState;
}
/// <summary>
/// Synchronously executes the delegate on the thread that created this object and marshals the call to the creating thread.
/// </summary>
/// <param name="method">A <see cref="T:System.Delegate" /> that contains a method to call, in the context of the thread for the control.</param>
/// <param name="args">An array of type <see cref="T:System.Object" /> that represents the arguments to pass to the given method. This can be <see langword="null" /> if no arguments are needed.</param>
/// <returns>
/// An <see cref="T:System.Object" /> that represents the return value from the delegate being invoked, or <see langword="null" /> if the delegate has no return value.
/// </returns>
public object Invoke(Delegate method, object[] args)
{
//Get IAsyncResult operation
var result = BeginInvoke(method, args);
//Wait for asynchronous operation to complete
EndInvoke(result);
//The return value of the delegate
return result.AsyncState;
}
}
So notice that ThreadSynchronization is tied to our thread daemon object which implements QueueWortItem. You could expose access to QueueWorkItem a different way if you wish. So, at long last, we have everything setup so we’re ready to alter the events themselves. These events, located within the thread daemon class, would have executed in another worker thread’s execution context. By instantiating the ThreadSynchronization object we can test if an invoke is required and enqueue the work to execute on the thread daemon thread and even get the return result of the event.
bool Incoming_Dequeued(object oData)
{
bool bReturn = false;
//If the event is called by a thread other than thread daemon...
if (Sync.InvokeRequired)
{
//Marshal delegate call and data to thread daemon context
var result = Sync.Invoke(new Func<object, bool>(Incoming_Dequeued), new object[] { oData });
bReturn = TypeParser.ParseBool(result, false);
return bReturn;
}
//Execute this code in the context of the thread daemon
return bReturn;
}
void ThreadPool_ThreadComplete(IModuleThread sender, object oResult)
{
//If the event is called by a thread other than thread daemon...
if (Sync.InvokeRequired)
{
//Marshal delegate call and data to thread daemon context
Sync.Invoke(new Action<IModuleThread, object>(ThreadPool_ThreadComplete), new object[] { sender, oResult });
return;
}
//Execute this code in the context of the thread daemon
}
At last here is the pattern I was looking for all too familiar to anyone who has worked on WinForms or WPF. Now we can easily see if we’re the correct thread and if not do an asynchronous or synchronous invoke of the delegate and data. When invoking it’s easy to use an Action<TIn1, TIn2> or Func<TIn1, Tin2, TOut>, as required, to generate your delegate.
In conclusion, you can see why Microsoft didn’t have a prepackaged solution to this as they couldn’t presume the executing thread would implement a message pump in a strict fashion. They did provide ISynchronizeInvoke which also needs IAsyncResult. Just creating these objects and implementing their interfaces lays bare what you need to do. While I love how it encapsulates this functionality in a familiar manner, it’s not strictly necessary. Really just the implementation of the message pump in our executing thread along with a composite object containing the delegate, data, and a lock would be enough to marshal the required pieces across and signal the thread who fired the event when execution is complete. However, if like me, you are hunting for a best practice implementation I’m very happy with how neatly the above solution turned out in the end.
Software Architect Conference 2012 November 19, 2012
Posted by codinglifestyle in Architecture, ASP.NET, CodeProject, Parallelism.Tags: architect, requirements, software architecture, software design, technical debt
add a comment
I was fortunate enough to have the opportunity to attend the Software Architect Conference this year in London. This is the same group which puts on DevWeek. It was short and sweet, just 2 days without the additional sessions before and after. Often with the daily grind you simply don’t have the time or inclination to challenge yourself with the sort of material presented at these conferences. This is what makes them unique, for a few precious days you are free of distractions to consider how and why we do what we do. I certainly found it useful and some of the speakers where truly impressive. While the technology we use continues to change at the speed of light, the great thing about software architecture is many of the basic principals of building a stable, well-engineered system haven’t changed since medieval times.
Keynote
-
Theme: 21st century architects should aspire to be like medieval “master builders”
- 7 years apprentice, many years to master, administers the project, deals with client, but still a master mason
- Keep coding – credibility with team, mitigates ivory tower
-
20th century software architects
- Stepped away from the code
- UML
- Analysis paralysis
- Ivory Tower syndrome
-
Architecture traps
- Enterprise Architecture Group – not sustainable, disconnected
- CV driven development – ego and fun over needs and requirements
- Going “Post-technical” – no longer involved in programming
-
Software Architecture summed up
- Create a shared vision – get everyone to move in the same direction
-
Architectural lessons learnt lost in Agile – baby out with the bath water
- It is a myth that there is a conflict between good software architecture and agile
-
What we do
- Requirements and constraints
- Evaluate and vet technology
- Design software
- Architectural evaluation
- Code!
- Maintainability
- Technical ownership
- Mentoring
- True team leadership is collaborative / mentoring
- Big picture: Just enough architecture to provide vision enough to move forward
Architectural Styles
-
Architectural definition defines 3 things
- What are the structural elements of the system?
- How are they related to each other?
- What are the underlying principles and rationale to the previous 2 questions?
-
Procedural
- Decompose a program into smaller pieces to help achieve modifiability.
- Single threaded sequential execution
-
RPC Model
- Still procedural: single thread of control
-
Threads
- Decouples activities from main process but still procedural
- Shared data must be immutable or copied
- Some people, when confronted with a problem, think, “I know, I’ll use threads,” and then two they hav erpoblesms.
-
Event based, Implicit Invocation
- The components are modules whose interfaces provide both a collection of procedures and a set of events
- Extensible / free plumbing
- Inversion of control (not dependency inversion)
-
Messaging
- Asynchronous way to interact reliably
- Instead of threads and shared memory use process independent code and message passing
-
Layers
- Regardless of interactions and coupling between different parts of a system, there is a need to develop and evolve them independently
- Each layer having a separate and distinct responsibility following a reasoned and clear separation of concerns
- Often “partitioned” but not true layers due to cross references which sneak in
-
Alternate Layers – spherical
- Core – domain model
- Inner crust – services wrapped around core
- Outer crust – wrapped external dependencies
-
Micro-kernel / Plug-in
- Small hub with everything plugged in
- Separates a minimal functional core from extended functionality and customer-specific parts
-
Shared repository
- DB and the like
- Procedures secondary, data is king!
- Maintain all data in a central repository shared be all functional components of the data-driven application and let the availability, quality, and state of that data trigger and coordinate the control flow of the application logic.
-
Pipes & Filters
- Divide the application’s task into several self-contained data processing steps and connect these steps to a data processing pipeline via intermediate data buffers.
- Process & queue → process & queue → process & queue
The Architecture of an Asynchronous Application
- Heavy focus on messaging throughout talk
-
About Messaging
- Guaranteed delivery at a cost
- Reliable and scalable
-
Subscription models
- 1 : n
- Round robin
- Publish / Subscribe
-
Messaging Terms
- Idempotency – will doing something twice change data / state?
- Poison message – situation where a message keeps being redelivered (perhaps because an exception is thrown before an ack is returned to queue)
-
Messaging platforms
- MSMQ – MS specific (personally found it easy enough to use)
- IBM MQ
- NServiceBus
- RabbitMQ – multiplatform, Multilanguage binding. Mentioned in numerous talks and focus of talk.
-
SignalR – interesting client-side messaging platform could be a more powerful model than using web services on the client
- install-package SignalR with NuGet
- Picks best available connection method
- Push from server to client
- Broadcast to all or to a specific client
Async with C# 5
- This talk is largely about Tasks and iterates through several examples of an application trying various asynchronous styles. The point is to try to get a minimal syntax such that an asynchronous application can be written is the same number of lines as a procedural program.
-
Context – must know the identity of which thread is executing. Critical in UIs and error handling
- SynchronizationContext class can revert thread context to calling thread (as can several other methods such as Invoke)
-
Tasks – a piece of asynchronous functionality
- Uses continuations to handle results
-
Async keyword – marks a function to allow use of the await keyword. Must return void or a Task.
private async void CalculatePi()
{
// Create the task which runs asynchronously.
Task<double> result = CalculatePiAsync();
// Calls the method asynchronously.
await result;
// Display the result.
textBox1.Text += result;}
- Putting a try/catch around this an the compiler will ensure that the error is rethrown in the correct context.
- Automatic use of thread pool which measures throughput to scale number of running threads up or down, as appropriate
-
Progress / Cancellation Features
- IProgress<T>
-
Can launch a collection of classes and then use different operation types such as
- var task = Task.WhenAny(tasks);
- which returns when the first task completes. Or use Task.WhenAll to wait for all tasks.
- WCF can generate the async methods to use tasks when adding Service References -> Advanced.
Inside Requirements
- Kevlin Henny, author 97 Things Every Programmer Should Know and Pattern Oriented SW Arch
- While listening to requirements we often stop listening while jumping ahead to solutions
- Killer question when cutting through nefarious design agendas: “What problem does this solve?”
- Patterns often misapplied – using a hammer to drive a screw leading to a pattern zoo
- Composing a solution to a problem rather than analysis to understand the problem
- Many to many relationships don’t need to be normalized (they model the real world)
- Describing is not the same a prescribing
-
A model is an abstraction of a point of view for a purpose
- Good – omits irrelevant detail
- Bad – omits necessary detail
-
RM-ODP: reference model using viewpoints a way of looking at a system / environment
- Enterprise – What does it do for the business?
- Information – What does it need to know?
- Computational – Decomposition into parts and responsibilities
- Engineering – Relationship of parts
- Technology – How will we build it?
-
Use Case
-
Use inverted pyramid style to place most important detail at the top. Move post-condition next to pre-condition. Sequence, containing detail about how you accomplish the steps in-between pre and post at bottom as only interest to implementers.
- Intent
- Pre-condition
- Post-condition
- Sequence – lots of juicy detail but actually least important from an architecture point of view
-
-
User Story
-
Traditional Connextra form
- As a <role>,
- I want <goal/desire>
-
So that <benefit>
- As an Account Holder
- I want to withdraw cash from an ATM
- So that I can get money when the bank is closed
-
Dan North scenario form
- Given <a context>
- When <a particular event occurs>
-
Then <an outcome is expected>
- Scenario 1: Account has sufficient funds
- Given the account balance is \$100
- And the card is valid
- And the machine contains enough money
- When the Account Holder requests \$20
- Then the ATM should dispense \$20
- And the account balance should be \$80
- And the card should be returned
-
-
Problems with the Use Case / User Story approach
- Observations are always made through a filter or world-view
- Until told what to observe you don’t know what you’ll get. In that case, is it even relevant?
- Use Case Diagrams neglect to notice they are fundamentally text/stories
-
Context Diagrams – shows the world and relationships around the system (UML actors)
- Litmus test: what industry does the diagram apply to?
- Not a technical decomposition
-
You’re an engineer planning to build a bridge across a river. So you visit the site. Standing on one bank of the river, you look at the surrounding land, and at the river traffic. You feel how exposed the place is, and how hard the wind is blowing and how fast the river is running. You look at the bank and wonder what faults a geological survey will show up in the rocky terrain. You picture to yourself the bridge that you are going to build. (Software Requirements & Specifications: “The Problem Context”)
- An analyst trying to understand a software development problem must go through the same process as the bridge engineer. He starts by examining the various problem domains in the application domain. These domains form the context into which the planned Machine must fit. Then he imagines how the Machine will fit into this context. And then he constructs a context diagram showing his vision of the problem context with the Machine installed in it.
- Problem Frame approach – describe a problem in diagrams
-
Grady Booch
- Use centric – visualization and manipulation of objects in a domain
- Datacentric – integrity persisting objects
- Computational centric – focus on transforming objects
- In summary: move from ignorance / assumptions → knowledge gathered from multiple points of view
A Team, A System, Some Legacy… and you
- Legacy System – so valuable it can’t be turned off (and it’s paid for!)
- Be aware a legacy system often comes with a legacy team engrained in their own methods
-
Being late to the party
- Software architecture often seems valuable only once things have gone wrong.
- Architects often join existing projects with to help improve difficult situations
- Often a real sense of urgency to “improve”
- Avoid distancing self to ivory tower and likewise avoid digging in thus losing big picture focus
- Software architecture techniques offer a huge value for older or troubled projects. Especially techniques to understand where you are and with whom
-
Stage 1: Understand
-
Right perspective
- See gathering requirements for perspectives of end user, business management, IT Managers, development, and support
-
Automated analysis tools
- NDepend, Lattix, Stucture 101, Sonar
- Dependency analysis
- Metrics
-
Monitor / Measure
-
Leverage existing production metrics
- IIS
- Oracle Enterprise Manager
- Implementation metrics
- Stakeholder opinions
-
-
Architectural Assessment
-
Systems Quality Assessment
- Context and stakeholder requirements
- Functional and deployment views
- Monitor and measure
- Automated analysis
-
Assessment Patterns
- ATAM – architectural trade off analysis method
- LAAAM – Lightweight architectural assessment method- more practical
-
TARA – tiny architectural review approach (recommended)
-
-
Minimal Modelling
- Define notation / terminology
-
Break up system to different viewpoints
- Functional
- Data
- Code
- Runtime
- Deployment – systems / services
- Ops – run, controlled, roll-back
- Focus on essentials for target audience
-
Deliverable:
- System context and requirements
- Functionality and deployment views
- Improve Analysis
- Requirements Assessment
- Identity and report
- Conclusion for sponsor
- Deliver findings and recommendations
-
-
Stage 2: Improve
- Team must be involved or rocketing risk affecting morale, confidence, competence
-
Choices based on risk
- Assess -> Prioritize -> Analyse -> Mitigate
-
Engage in Production
-
Why
- Reality check
-
How
- Monitoring, stats, and incidence management
-
Who
- Biz man, IT man, support
-
-
Tame the Support Burden
- Drain on development
- Support team can offset this
- Avoid “over the wall” mentality
-
Continuous Integration and Deployment
- Start simple
- Increased efficiency and reliability
-
Automated Testing
- Unit test + coverage, regression tests
- Costly
-
Safe step evolution
- Control risk
- Wrap with tests
- Partition
- Simplify
- Improve
- Generalize
- Repeat
-
Stay coding – but if a pure architect stay off the critical path
- Beware ROI of your coding skills vs. architect’s skills
- Refactor, write unit tests, address warnings
-
Define the future
- Good for the team
- Clear, credible system architecture for the medium term (1-2 years)
- Beware: timing and predictions
Technical Debt
- As an evolving program is continually changed, its complexity (reflecting deteriorating structure) increases unless work is done to maintain or reduce it
- Technical Debt is a metaphor developed by Ward Cunningham to help us think about the above statement and choices we make about the work required to maintain a system
- Like a financial debt, the technical debt incurs interest payments, which comes in the form of the extra effort that we have to do in future development because of the quick and dirty design choice. We can choose to continue paying the interest, or we can pay down the principal by refactoring the quick and dirty design into a better design
- Sometimes, upon reflection, it is better to pay interest. But are we trapped paying so much interest we can never get ahead?
-
What is the language of debt?
- Amortise, repayment, balance, write off, restructure, asset, interest, default, credit rating, liability, principal, load, runaway, loan, consolidation, spiralling, value
- Shipping first time code is like going into debt. A little debt can speed delivery so long as it is paid back promptly with a rewrite
- The danger is ignoring or not paying back the debt (compound interest!)
- Rebuttal: A mess is not a technical debt. A mess is just a mess.
- Counter response: The useful distinction isn’t between debt or non-debt, but between prudent and reckless debt.
-
There is also a difference between deliberate debt and inadvertent debt.
- There is little excuse for introducing reckless debt
- Awareness of technical debt is the responsibility of all roles
- Consideration of debt must involve practice and process
-
Management of technical debt must account for business value
-
Perfection isn’t possible, but understanding the ideal is useful
Books, People, and Topics of Note
- Simon Brown – www.codingarchitecture.com
- Alan Holub – www.holub.com
- Kevlin Henney – Pattern Oriented Software Architecture
- Grady Booch – architecture vs. design
- Linda Rising
- George Fairbanks – Just Enough Software Architecture
- Roy Osherove – Notes to a Software Team Leader
- Top 10 Traits of a Rockstar Software Developer
- Becoming a Technical Leader – Gerald Weinberg
- 101 Things I Learned in Architecture School
- Architecting Enterprise Solutions
- Software Architecture – Perspectives of an Emerging Discipline
- Software Requirements and Specification – Michael Jackson
- Problem Frames – Michael Jackson
- 12 Essential Skills For SW Arch
- Refactoring to Patterns
- Managing Software Debt
- Modernizing Legacy Systems
- Working Effectively with Legacy Code
- Growing Object-Oriented Software, Guided by Tests
- Knockout.js – MVVM javascript library. Takes JSON and allows you to connect to HTML in a simple way I presume w/o the manual jQuery work of redrawing your control (e.g. autocomplete textbox)
- Backbone.js – model / view extension with events
- Parasoft Jtest smoke test
- Selenium automation UI test
- RabbitMQ – client side messaging queue
- LightStreamer / SignalIR – web sockets for client (stop gap for HTML5?)
Understanding BackgroundWorker and Encapsulating your own Thread Class February 22, 2011
Posted by codinglifestyle in C#, CodeProject, Parallelism.Tags: asynchronous, BackgroundWorker, class, encapsulate, events, not firing, synchronization, thread
add a comment
You may have come across this page if you were searching for any of the following:
- BackgroundWorker events not firing
- BackgroundWorker RunWorkerCompleted event not firing
- BackgroundWorker threads frozen
- Encapsulate thread class
Yesterday my web page was launching several worker threads and waiting for them to return to amalgamate the results in to a single data set to bind to a grid. Launching several worker threads and waiting for a join is a common pattern. To nicely encapsulate the thread itself I derived a class from BackgroundWorker. BackgroundWorker has many advantages such as using an event model, thread pool, and all the thread plumbing built right in. All you have to do is override OnDoWork and off you go. The RunWorkerCompleted event was used, in conjunction with a lock, to collect the data once the worker thread finished.
Everything looked good but for some reason the event never fired. The problem was that I had gotten myself in to a deadlock scenario. The expectation is that when the event fires the delegate method will run in the context of the thread which fired it. If this were true, this would have allowed my synchronization logic to operate normally and not deadlock. The reality is that BackgroundWorker goes out of its way to run this event in the calling thread’s identity. It did this so when using BackgroundWorker in conjunction with the UI no invoke will be required (an exception will be thrown if a thread tries to touch the UI’s controls requiring you to check InvokeRequired).
When in doubt, use something like this to check the identity of the thread executing the code:
Trace.WriteLine(string.Format(“Thread id {0}”, System.Threading.Thread.CurrentThread.ManagedThreadId));
Once putting the above trace in the code I would clearly see that the identity of my main thread was identical to the thread identity in the RunWorkerCompleted event. Once the code tried to aquire the lock it was all over.
So the solution in my case was not to use the RunWorkerCompleted event. I added an alternative event to my thread class and called that at the end of OnDoWork. The event executed in the context of the thread, as expected, and my synchronization logic worked fine. But I couldn’t help feeling it was a bit of a kludge and pondered whether I should be deriving from BackgroundWorker at all. However, what’s the alternative? There really aren’t other alternatives to BackgroundWorker built in to the framework but it is easy to create your own. See below:
/// <summary> /// Abstract base class which performs some work and stores it in a data property /// </summary> /// <typeparam name="T">The type of data this thread will procure</typeparam> public abstract class ThreadBase<T> { #region Public methods /// <summary> /// Does the work asynchronously and fires the OnComplete event /// </summary> public void DoWorkAsync() { DoWorkAsync(null); } /// <summary> /// Does the work asynchronously and fires the OnComplete event /// </summary> /// <param name="arguement">The arguement object</param> public void DoWorkAsync(object arguement) { ThreadPool.QueueUserWorkItem(DoWorkHelper, arguement); } /// <summary> /// Does the work and populates the Data property /// </summary> public void DoWork() { DoWork(null); } /// <summary> /// Does the work and populates the Data property /// </summary> /// <param name="arguement">The arguement object</param> /// <remarks> /// Can be called to run syncronously, which doesn't fire the OnComplete event /// </remarks> public abstract void DoWork(object arguement); #endregion #region Private methods private void DoWorkHelper(object arguement) { DoWork(arguement); if (OnComplete != null) OnComplete.Invoke(this, Data); } #endregion #region Properties public T Data { get; protected set; } #endregion #region Events /// <summary> /// Delegate which is invoked when the thread has completed /// </summary> /// <param name="thread">The thread.</param> /// <param name="data">The data.</param> public delegate void ThreadComplete(ThreadBase<T> thread, T data); /// <summary> /// Occurs when the thread completes. /// </summary> /// <remarks>This event operates in the context of the thread</remarks> public event ThreadComplete OnComplete; #endregion }
My generic ThreadBase class is a lightweight baseclass substitute for BackgroundWorker providing the flexibility to call it synchronously or asynchronously, a generically typed Data property, and an OnComplete event. The OnComplete will execute in the thread’s context so synchronization of several threads won’t be a problem. Take a look at it in action:
public class MyThread : ThreadBase<DateTime> { public override void DoWork(object arguement) { Trace.WriteLine(string.Format("MyThread thread id {0}", System.Threading.Thread.CurrentThread.ManagedThreadId)); Data = DateTime.Now; } }
What a nicely encapsulated thread! Below we can see how cleanly a MyThread can be used:
MyThread thread = new MyThread(); thread.OnComplete += new ThreadBase<DateTime>.ThreadComplete(thread_OnComplete); thread.DoWorkAsync(); void thread_OnComplete(ThreadBase<DateTime> thread, DateTime data) { Trace.WriteLine(string.Format("Complete thread id {0}: {1}", Thread.CurrentThread.ManagedThreadId, data)); }
Then I got to thinking what if I wanted the best of both worlds? Thanks to reflector I found out how BackgroundWorker’s RunWorkerCompleted event executes in the context of the calling thread. My generic ThreadBaseEx class offers two events: OnCompleteByThreadContext and OnCompleteByCallerContext.
/// <summary> /// Abstract base class which performs some work and stores it in a data property /// </summary> /// <typeparam name="T">The type of data this thread will procure</typeparam> public abstract class ThreadBaseEx<T> { #region Private variables private AsyncOperation _asyncOperation; private readonly SendOrPostCallback _operationCompleted; #endregion #region Ctor public ThreadBaseEx() { _operationCompleted = new SendOrPostCallback(AsyncOperationCompleted); } #endregion #region Public methods /// <summary> /// Does the work asynchronously and fires the OnComplete event /// </summary> public void DoWorkAsync() { DoWorkAsync(null); } /// <summary> /// Does the work asynchronously and fires the OnComplete event /// </summary> /// <param name="arguement">The arguement object</param> public void DoWorkAsync(object arguement) { _asyncOperation = AsyncOperationManager.CreateOperation(null); ThreadPool.QueueUserWorkItem(DoWorkHelper, arguement); } /// <summary> /// Does the work and populates the Data property /// </summary> public void DoWork() { DoWork(null); } /// <summary> /// Does the work and populates the Data property /// </summary> /// <param name="arguement">The arguement object</param> /// <remarks> /// Can be called to run syncronously, which doesn't fire the OnComplete event /// </remarks> public abstract void DoWork(object arguement); #endregion #region Private methods private void DoWorkHelper(object arguement) { DoWork(arguement); if (OnCompleteByThreadContext != null) OnCompleteByThreadContext.Invoke(this, Data); _asyncOperation.PostOperationCompleted(this._operationCompleted, arguement); } private void AsyncOperationCompleted(object arg) { OnCompleteByCallerContext(this, Data); } #endregion #region Properties public T Data { get; protected set; } #endregion #region Events /// <summary> /// Delegate which is invoked when the thread has completed /// </summary> /// <param name="thread">The thread.</param> /// <param name="data">The data.</param> public delegate void ThreadComplete(ThreadBaseEx<T> thread, T data); /// <summary> /// Occurs when the thread completes. /// </summary> /// <remarks>This event operates in the context of the worker thread</remarks> public event ThreadComplete OnCompleteByThreadContext; /// <summary> /// Occurs when the thread completes. /// </summary> /// <remarks>This event operates in the context of the calling thread</remarks> public event ThreadComplete OnCompleteByCallerContext; #endregion }
Your encapsulated thread will be the same as above, but now with two events allowing either scenario, depending on what suits.
Late binding DataGridView data with BackgroundWorker threads October 18, 2010
Posted by codinglifestyle in C#, Parallelism, Uncategorized, Winform.Tags: BackgroundWorker, DataGridView, late binding, threads
1 comment so far
In an increasingly multicore world you may notice your WinForm app never pushes your machine to 100%. Great, right? Erm, no. Winforms are traditionally single threaded applications meaning we typically only tap into 1/2, 1/4, or even 1/8th of our processor’s potential.
I’ve recently been working on a utility containing a DataGridView for displaying work item data from TFS. Some of the column data had to be calculated so when displaying 500+ records the whole app was slowing down considerably. What I wanted was a delayed binding such that the cell would be initially blank, launch a thread, and update itself when the thread returned with the value. It turned out this was pretty easy.
First, use a data entity. You probably don’t have to, but I find having this layer offers an obvious place to add the changes I’ll cover below. The ideal place is in the bound property’s getter. Here you can see that the value is not yet calculated, launch a thread, and return blank or another default value in the meantime.
private int _nWeight = -1;
public int Weight
{
get
{
if (_nWeight < 0)
{
Tfs.GetWeight(Tag, GetWeightComplete);
_nWeight = 0;
}
return _nWeight;
}
}
private void GetWeightComplete(object sender, RunWorkerCompletedEventArgs e)
{
_nWeight = (int)e.Result;
if (Row < FormMain.Instance.Grid.Rows.Count)
FormMain.Instance.Grid.InvalidateRow(Row);
}
The property above represents the weight of the entity to be used in sorting or graphing by importance. Calculating the weight is a time intensive operation and is a great candidate for calculating in a worker thread. Notice a backing store, _nWeight, is used to check if the value has been calculated and also to cache the value. If _nWeight is uncached (-1) we launch the thread and return a default weight of 0 while the thread calculates the actual value. Notice when we call Tfs.GetWeight we pass the delegate, GetWeightComplete, as an argument. This function will ultimately be called when the thread returns.
public static void GetWeight(WorkItem wi, RunWorkerCompletedEventHandler onCompleteEvent)
{
BackgroundWorker worker = new BackgroundWorker();
worker.DoWork += new DoWorkEventHandler(GetWeightDoWork);
worker.RunWorkerCompleted += onCompleteEvent;
worker.RunWorkerAsync(wi);
}
private static void GetWeightDoWork(object sender, DoWorkEventArgs e)
{
WorkItem wi = (WorkItem)e.Argument;
int result = 0;
foreach (Revision rev in wi.Revisions)
{
if (rev.Fields[CoreField.ChangedBy].Value.ToString() == wi.Store.TeamFoundationServer.AuthenticatedUserDisplayName)
result += 1;
}
result = Math.Min(result, 10);
e.Result = result;
}
When a call is made to GetWeight you can see it uses the standard System.CompnentModel.BackgroundWorker class to manage the work. This has two main advantages: 1) an easy to use asynchronous event based pattern and 2) uses the thread pool for optimal thread management. Notice the two events, DoWork and RunWorkerCompleted are set before the thread is launched and that we can pass an arguement via RunWorkerAsync. GetWeightDoWork is called when the thread is launched and sets the result to the event arguments. When we leave this function the RunWorkerCompleted event is called.
Finally, back in the data entity, GetWeightComplete is called when the thread has calculated the value. The result is taken from the RunWorkercompletedEventArgs and set to the backing store. The form uses the singleton pattern and exposes the grid as a property (see here). This allows the data entity to easily invalidate the row which redraws the row taking the Weight value into account (in my case the weight determined the intensity of a yellow highlight drawn over the row indicating how often the authenticated user appeared in the work item’s revision history).
The user experience when using the above method is truely fantastic. You get a responsive UI which immediately displays information with calculated information quickly coming in a few seconds later. The standard binding method of the DataGridView further enhances this experience by only binding the data currently shown to the user. So if only the first 25 rows are displayed, only those values will be calculated. As we scroll down to show more rows the DataGridView will calculate only the newly bound rows (saving loads of time for potentially 1000’s of rows never shown). Good luck unlocking your other cores for a better UI experience.
Yet Another VS2010 Overview June 18, 2010
Posted by codinglifestyle in ASP.NET, C#, Parallelism, Visual Studio 2010.add a comment
Today I attended a mediocre presentation by Paul Fallen which looked stellar compared to the atrocious overview put on at the Galway VS2010 Launch Event. Paul had the look of a man who had seen these slides many times and glossed over them at speed. In fairness, he was using the same presentation deck I’ve seen since TechEd 2008. I think we had all seen several flavours of this overview by this time, so nobody seemed to mind. Below are the few snippets of information to add to the smorgasbord of other snippets I’ve gleaned from other talks of this nature.
Please click here for more comprehensive posts earlier on VS2010.
Here is the VS2010 Training Kit which was used in the demos.
- Common Language Runtime
- Latest version is CLR 4 (to go with .NET 4).
- Previous version of CLR 2 encompassed .NET 2, 3, 3.5, 3.5SP1
- Implications
- App pool .NET Framework version
- Incompatibilities running CLR frameworks side by side within same process
- Think 3 COM objects accessing Outlook all using CLR1, 2, and 4
- Managed Extensibility Framework (MEF)
- Library in .NET that enables greater reuse of applications and components
- VS2010 & C# 4.0
- IDE
- WPF editor – Ctrl + mouse wheel to zoom. Handy for presentations
- Box select (like command prompt selection)
- Breakpoint labelling, import/export, Intellitrace (covered below)
- Code navigation improvements (Ctrl + , and Ctrl + – for back)
- Call Hierarchy
- Allows you to visualize all calls to and from a selected method, property, or constructor
- Improved Intellisense
- Greatly improved javascript intellisense
- Support for camel case
- Can toggle (Ctrl + Space) between suggestive and consume first mode (handy for TDD)
- Test run record, fast forward
- Better multi-monitor support, docking enhancements
- Tracking requirements and tasks as work items
- WPF editor – Ctrl + mouse wheel to zoom. Handy for presentations
- Better control over ClientID
- Routing moved out from MVP to general ASP.NET
- Optional and named parameters
- Improved website publishing, ClickOnce (see prev. posts)
- IDE
- Parallelism
- Pillars
- Task Parallel Library (TPL)
- He didn’t touch at all on the new task concept
- Parallel LINQ (PLINQ)
- These are the extension methods to LINQ to turn query operators in to parallel operations.
- var matches = from person in people.AsParallel()
- where person.FirstName == “Bob”
- select person;
- These are the extension methods to LINQ to turn query operators in to parallel operations.
- System.Threading significant updates
- Coordination Data Structures (CDS)
- Lightweight and scalable thread-safe data structures and synchronization primitives
- Task Parallel Library (TPL)
- Toolset
- Debugger: record and visualize threads
- Visualizer: View multiple stacks
- IntelliTrace – new capability to record execution, play it backwards and forwards, even email it to another engineer and have them reproduce the problem on their box
- Other
- Eventual depreciation of ThreadPool as higher level abstractions layer atop toolkit?
- Unified cancellation using cancellation token
- Pillars
- Dynamic Language Runtime (DLR)
- New feature in CLR 4
- Major new feature in C# 4 is dynamic type
- What Linq was to C# 3.5
- Adds shared dynamic type system, standard hosting model and support to make it easy to generate fast dynamic code
- Big boost working with COM: type equivalence, embedded interop, managed marshalling
- Windows Communication Framework (WCF)
- Service discovery
- Easier to discover endpoints
- Imagine an IM chat program or devices that come and go
- REST support via WCF WebHttp Services
- Available in the code gallery templates
- Service discovery
Multi-threaded WebService: “Unable to connect to remote server” April 24, 2007
Posted by codinglifestyle in ASP.NET, IIS, Parallelism.Tags: IIS, Parallelism, ports, tcp/ip, threads, web service
2 comments
You know you’ve made it in to hackerdom when 4000 ports just isn’t enough. I need more power!
I have a webservice which spawns, potentially thousands, of threads. These in turn are calling a webservice in SharePoint to perform a real-time query (or else we could make a single call to the search service which relies on the index being up-to-date). I did think to include a throttle which would restrict the total number of threads spawned across all calls to the webmethod. However, even with this number safely at 100 threads I didn’t account for TCP/IP’s default settings keeping the port alive 4 minutes. It didn’t take long to put around 4000 ports in a TIME_WAIT state. When my webservice would make a call to the SharePoint webservice this would result in a System.Net.WebException:
“Unable to connect to remote server” with an inner exception of:
“Only one usage of each socket address (protocol/network address/port) is normally permitted”
The solution was simple enough and is common across a number of greedy applications. TCP/IP allows us to up the ante by changing a few parameters in the registry. This allows you to adjust the delay before a port is available again. In addition you may increase the number of ports at your disposal by nearly 60,000. If this isn’t enough, maybe a design change is in order!
Here’s how you do it:
The product handles heavy query volumes more efficiently if the following TCP/IP parameters are set in the Windows® registry:
1. Start the registry editor:
a. Click the Windows Start button.
b. Click Run.
c. Type regedit in field provided.
d. Click OK
2. Use the following directory path to navigate to the registry key:
HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters
3. In the right pane of the registry editor, look for the TcpTimedWaitDelay value name. If it is not there, add it by selecting EditNewDWORD Value from the menu bar. Type the value name TcpTimedWaitDelay in the name box that appears with the flashing cursor.
Note: If you do not see the a flashing cursor and New Value # inside the box, right-click inside the right panel and select Rename from the menu, then type the value name TcpTimedWaitDelay in the name box.
4. Double-click inside the right pane again to set the value of TcpTimedWaitDelay. Select Decimal as the Base, and enter 30 in the Value data field.
5. In the right pane of the registry editor, look for the MaxUserPort value name. If it is not there, add it by selecting EditNewDWORD Value from the menu bar. Type the value name MaxUserPort in the name box that appears with the flashing cursor.
Note: If you do not see the a flashing cursor and New Value # inside the box, right-click inside the right panel and select Rename from the menu, then type the value name TcpTimedWaitDelay in the name box.
6. Double-click inside the right pane again to set the value of MaxUserPort. Select Decimal as the Base, and enter 65534 in the Value data field.
7. You must restart Windows for these settings to take effect
Reference: http://blogs.msdn.com/dgorti/archive/2005/09/18/470766.aspx
Update!
After issuing the above changes were made and the customer finally rebooted the server I ran in to an new, exciting System.Net.WebException:
“The underlying connection was closed: An unexpected error occurred on a send.” with a status of: SendFailure
Searching for this exception lead me to exactly the information I was looking for. Let’s close down those ports as soon as we are done with them. I have no desire to use up thousands of ports and wear the hacker crown. We can do this by changing the keep alive state of our web request to false. Funny that this isn’t by default false for a webservice nor is it a public member to set like the timeout. We have two choices, the high road and the low road. First the low road:
We will alter generated proxy class directly; meaning your fix may be lost if you update your web reference. In the GetWebRequest function the KeepAlive property must be set to false. This can be accomplished by following these steps:
- Add a Web Reference using the normal way (if you haven’t already added one ofcourse).
- Make sure Show All Files menu item is enable in the Project menu.
- In the Solution Explorer window, navigate to:
- Web References
-
- Reference.map
- Reference.cs (or .vb)
- Reference.map
-
- Web References
- Open the Reference.cs file and add following code in the webservice proxy class:
-
protected override System.Net.WebRequest GetWebRequest(Uri uri)
{
System.Net.HttpWebRequest webRequest =
(System.Net.HttpWebRequest)base.GetWebRequest(uri);
webRequest.KeepAlive = false;
webRequest.ProtocolVersion = HttpVersion.Version10;
return webRequest;
}
-
The high road, or proper way of doing this, is to subclass our webservice such that we don’t touch the auto-generated proxy class. Here is a sample:
using System;
using System.Net;
using System.Reflection;// Web service reference entered in wizard was “MyWebService.MyWebServiceWse”
using MyProject.MyWebService;namespace MyProject
{
public class MySubClassedWebService : MyWebServiceWse
{
private static PropertyInfo requestPropertyInfo = null;public MySubClassedWebService(){}
protected override System.Net.WebRequest GetWebRequest(Uri uri)
{
WebRequest request = base.GetWebRequest(uri);// Retrieve property info and store it in a static member for optimizing future use
if (requestPropertyInfo==null)
requestPropertyInfo = request.GetType().GetProperty(“Request”);// Retrieve underlying web request
HttpWebRequest webRequest = (HttpWebRequest)requestPropertyInfo.GetValue(request, null);// Setting KeepAlive
webRequest.KeepAlive = false;
// Setting protocol version
webRequest.ProtocolVersion = HttpVersion.Version10;
return request;
}
}
}
Reference: http://weblogs.asp.net/jan/archive/2004/01/28/63771.aspx