jump to navigation

Thread Synchronization Between Worker Threads April 15, 2021

Posted by codinglifestyle in Architecture, C#, CodeProject, Parallelism.
Tags: , , , ,
trackback

Many moons ago I wrote the last Windows service I’d ever need. It has a pluggable architecture and today there are over 20 modules doing all manner of business tasks such as sending emails, generating reports, performing automation, etc. Inside each plug-in module is a thread daemon which listens to events to orchestrate the operations of the other supporting module threads. This primarily consists of a thread which provides incoming data (what?), a thread which is the scheduler (when?), and a thread pool of worker threads which process said data (how?).

Recently a new plug-in was created to enforce global trade compliance. A bug crept up in the new worker thread initialization which surprised me because it caused the incoming data queue thread to die even though that code hadn’t changed in years. The reason was that the incoming queue fired the event and therefore it was the thread which executed the delegate (aka function pointer). This function is defined in the thread daemon class which takes care of initializing and starting a worker thread. From an error handling point of view this struck me as less than ideal. So the question was asked:

How do I get the thread daemon to be the thread which executes it’s own management functions which are triggered by events?

This got me thinking about WinForm apps whereby a worker thread must marshal execution of a delegate to the main UI thread to update the UI like changing the status bar. The pattern is to check if an invoke is required (am I the UI thread?) and if not then you use the built in SynchronizationObject to invoke the delegate in the UI thread.

        public delegate void UpdateStatusEvent(string message, params object[] args);

        public void UpdateStatus(string message, params object[] args)
        {
            if (_Status.InvokeRequired)
            {
                UpdateStatusEvent amc = new UpdateStatusEvent(UpdateStatus);
                this.Invoke(amc, new object[] { message, args });
            }
            else
            {
                //Now we are the UI thread we can actually update the status bar
                _LabelStatus.Text = string.Format(message, args);
                _Status.Refresh();
            }
        }

With this pattern in my head I thought I’d be able to implement this quite easily between my background worker threads. How wrong I was and hence this article! WinForms and WPF provide a synchronization context but there isn’t something in the framework you get for free in my case. From what I understand they both implement a classic windows message pump which is used to marshal the execution from the worker thread to the UI thread. This will become important shortly, but suffice it to say after a lot of searching a pre-baked, easy solution wasn’t on offer.

As I continued down this path I searched for ISynchronizeInvoke and IAsyncResult which primarily returned questions and examples of the above pattern. I tried in vein to find an equivalent implementation except for worker threads. Surely it wasn’t that complicated to get an event initiated in Thread A to execute in Thread B? I tried using the SynchronizationContext class but quickly discovered it’s just a base class and didn’t do the work of marshalling the execution from Thread A to Thread B. So while I went through the motions the code still executed in the wrong thread context (so why isn’t this class abstract if you must extend it to get it to work?). BTW, the way I was testing to see which thread was running was to refer to Thread.CurrentThread.ManagedThreadId.

So now I had wasted a lot of time trying to find an easy answer and had to accept some work on my part would be necessary in this scenario. What struck me was the notion of the classic windows pump being crucial for the synchronization context. Basically that it has a message queue running so a thread can enqueue a message and that message then be executed by another thread when it is able. So in the thread daemon I defined a queue and in the OnDoWork created my rudimentary “message pump”.

        private Queue<WaitCallback> _qWorkItems;
        private object _oLock;
      
        protected override void OnDoWork(DoWorkEventArgs e)
        {
            LogSvc.Debug(this, "Running");

            while (!CancellationPending)
            {
                //Critical section
                lock (_oLock)
                {
                    //This is the message pump allowing for the thread synchronization context
                    while (_qWorkItems.Count > 0)
                    {
                        LogSvc.Debug(this, "Dequeue and invoke work item on thread {0}", Thread.CurrentThread.ManagedThreadId);

                        //Dequeue next work item
                        var workItem = _qWorkItems.Dequeue();
                        
                        //Execute work item in thread daemon context
                        workItem.Invoke(null);
                    }

                    //Wait for new work items to process
                    Monitor.Wait(_oLock);
                }
            }
        }

        /// <summary>
        /// Queues a method for execution.  The method executes when the thread daemon is available.
        /// </summary>
        /// <param name="callback">The callback.</param>
        public void QueueWorkItem(WaitCallback callback)
        {
            LogSvc.Debug(this, "Enqueuing work item from event caller thread {0}", Thread.CurrentThread.ManagedThreadId);

            //Critical section
            lock (_oLock)
            {
                _qWorkItems.Enqueue(callback);
                Monitor.Pulse(_oLock);
            }
        }

I started with a Queue<Action> which allowed me to accomplish asynchronous execution. However I wanted to be able to support synchronous execution as well to support getting the return value from the delegate. So I looked at what ThreadPool.EnqueueUserWorkItem used and settled on WaitCallback.

Now we have our thread daemon setup to queue and execute operations in it’s thread context. What we need next is a synchronization context to allow the worker threads to marshal the delegate and data from their thread to the thread daemon thread. We’ll implement both ISynchronizeInvoke and IAsyncResult classes to nicely encapsulate this functionality. This will offer a test to see if an invoke is required and support both asynchronous and synchronous execution of the event delegate.

    /// <summary>
    /// Internally used by ThreadSynchronization to represent asynchronous operations
    /// </summary>
    /// <seealso cref="System.IAsyncResult" />
    class ThreadAsyncResult : IAsyncResult
    {
        /// <summary>
        /// Gets a value that indicates whether the asynchronous operation has completed.
        /// </summary>
        public bool IsCompleted { get; set; }

        /// <summary>
        /// Gets a <see cref="T:System.Threading.WaitHandle" /> that is used to wait for an asynchronous operation to complete.
        /// </summary>
        public WaitHandle AsyncWaitHandle { get; internal set; }

        object _state;
        /// <summary>
        /// Gets a user-defined object that qualifies or contains information about an asynchronous operation.
        /// </summary>
        public object AsyncState
        {
            get
            {
                if (Exception != null)
                {
                    throw Exception;
                }
                return _state;
            }
            internal set
            {
                _state = value;
            }
        }

        /// <summary>
        /// Gets a value that indicates whether the asynchronous operation completed synchronously.
        /// </summary>
        public bool CompletedSynchronously { get { return IsCompleted; } }

        /// <summary>
        /// Gets or sets the exception.
        /// </summary>
        /// <value>
        /// The exception.
        /// </value>
        internal Exception Exception { get; set; }
    }

    /// <summary>
    /// Thread synchronization context to marshal delegate and data to ThreadManager thread
    /// </summary>
    /// <seealso cref="System.ComponentModel.ISynchronizeInvoke" />
    class ThreadSynchronization : ISynchronizeInvoke
    {
        public readonly int _nExecutingContextID = 0;
        private ThreadManager _manager;
        
        /// <summary>
        /// Initializes a new instance of the <see cref="ThreadSynchronization"/> class.
        /// </summary>
        /// <param name="manager">The thread manager object</param>
        public ThreadSynchronization(ThreadManager manager)
        {
            _nExecutingContextID = Thread.CurrentThread.ManagedThreadId;
            _manager             = manager;

            Log.Debug("Synchronization context created for thread {0}", _nExecutingContextID);
        }

        /// <summary>
        /// Gets a value indicating whether the caller must call <see cref="M:System.ComponentModel.ISynchronizeInvoke.Invoke(System.Delegate,System.Object[])" /> when calling an object that implements this interface.
        /// </summary>
        public bool InvokeRequired => Thread.CurrentThread.ManagedThreadId != _nExecutingContextID;

        /// <summary>
        /// Asynchronously executes the delegate on the thread that created this object.
        /// </summary>
        /// <param name="method">A <see cref="T:System.Delegate" /> to a method that takes parameters of the same number and type that are contained in <paramref name="args" />.</param>
        /// <param name="args">An array of type <see cref="T:System.Object" /> to pass as arguments to the given method. This can be <see langword="null" /> if no arguments are needed.</param>
        /// <returns>
        /// An <see cref="T:System.IAsyncResult" /> interface that represents the asynchronous operation started by calling this method.
        /// </returns>
        public IAsyncResult BeginInvoke(Delegate method, object[] args)
        {
            var result                 = new ThreadAsyncResult();
            var manualResetEvent       = new ManualResetEvent(false);
            result.AsyncWaitHandle     = manualResetEvent;

            _manager.QueueWorkItem(delegate
            {
                try
                {
                    //Invoke the delegate and capture the return value
                    result.AsyncState  = method.DynamicInvoke(args);
                }
                catch (Exception ex)
                {
                    Log.Err(ex);
                    //Capture the exception
                    result.Exception   = ex;
                }
                finally
                {
                    //Mark complete
                    result.IsCompleted = true;
                    //Set event for anyone waiting
                    manualResetEvent.Set();
                }
            });

            return result;
        }

        /// <summary>
        /// Waits until the process started by calling <see cref="M:System.ComponentModel.ISynchronizeInvoke.BeginInvoke(System.Delegate,System.Object[])" /> completes, and then returns the value generated by the process.
        /// </summary>
        /// <param name="result">An <see cref="T:System.IAsyncResult" /> interface that represents the asynchronous operation started by calling <see cref="M:System.ComponentModel.ISynchronizeInvoke.BeginInvoke(System.Delegate,System.Object[])" />.</param>
        /// <returns>
        /// An <see cref="T:System.Object" /> that represents the return value generated by the asynchronous operation.
        /// </returns>
        public object EndInvoke(IAsyncResult result)
        {
            //If not complete then wait until done
            if (!result.IsCompleted)
            {
                result.AsyncWaitHandle.WaitOne();
            }

            //The return value of the delegate
            return result.AsyncState;
        }

        /// <summary>
        /// Synchronously executes the delegate on the thread that created this object and marshals the call to the creating thread.
        /// </summary>
        /// <param name="method">A <see cref="T:System.Delegate" /> that contains a method to call, in the context of the thread for the control.</param>
        /// <param name="args">An array of type <see cref="T:System.Object" /> that represents the arguments to pass to the given method. This can be <see langword="null" /> if no arguments are needed.</param>
        /// <returns>
        /// An <see cref="T:System.Object" /> that represents the return value from the delegate being invoked, or <see langword="null" /> if the delegate has no return value.
        /// </returns>
        public object Invoke(Delegate method, object[] args)
        {
            //Get IAsyncResult operation
            var result = BeginInvoke(method, args);
            //Wait for asynchronous operation to complete
            EndInvoke(result);
            //The return value of the delegate
            return result.AsyncState;
        }
    }

So notice that ThreadSynchronization is tied to our thread daemon object which implements QueueWortItem. You could expose access to QueueWorkItem a different way if you wish. So, at long last, we have everything setup so we’re ready to alter the events themselves. These events, located within the thread daemon class, would have executed in another worker thread’s execution context. By instantiating the ThreadSynchronization object we can test if an invoke is required and enqueue the work to execute on the thread daemon thread and even get the return result of the event.

        bool Incoming_Dequeued(object oData)
        {
            bool bReturn = false;

            //If the event is called by a thread other than thread daemon...
            if (Sync.InvokeRequired)
            {
                //Marshal delegate call and data to thread daemon context
                var result = Sync.Invoke(new Func<object, bool>(Incoming_Dequeued), new object[] { oData });
                bReturn    = TypeParser.ParseBool(result, false);

                return bReturn;
            }

            //Execute this code in the context of the thread daemon

            return bReturn;
        }

        void ThreadPool_ThreadComplete(IModuleThread sender, object oResult)
        {
            //If the event is called by a thread other than thread daemon...
            if (Sync.InvokeRequired)
            {
                //Marshal delegate call and data to thread daemon context
                Sync.Invoke(new Action<IModuleThread, object>(ThreadPool_ThreadComplete), new object[] { sender, oResult });
                return;
            }

            //Execute this code in the context of the thread daemon
        }

At last here is the pattern I was looking for all too familiar to anyone who has worked on WinForms or WPF. Now we can easily see if we’re the correct thread and if not do an asynchronous or synchronous invoke of the delegate and data. When invoking it’s easy to use an Action<TIn1, TIn2> or Func<TIn1, Tin2, TOut>, as required, to generate your delegate.

In conclusion, you can see why Microsoft didn’t have a prepackaged solution to this as they couldn’t presume the executing thread would implement a message pump in a strict fashion. They did provide ISynchronizeInvoke which also needs IAsyncResult. Just creating these objects and implementing their interfaces lays bare what you need to do. While I love how it encapsulates this functionality in a familiar manner, it’s not strictly necessary. Really just the implementation of the message pump in our executing thread along with a composite object containing the delegate, data, and a lock would be enough to marshal the required pieces across and signal the thread who fired the event when execution is complete. However, if like me, you are hunting for a best practice implementation I’m very happy with how neatly the above solution turned out in the end.

Comments»

No comments yet — be the first.

Leave a comment