ThreadExtensions.cs

#nullable enable
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
 
namespace ThreadExtensions;
public class Dispatcher : IDisposable
{
    internal static ConcurrentDictionary<Thread, Dispatcher> _dispatchers = new ConcurrentDictionary<Thread, Dispatcher>();
 
    private readonly Thread _initialThread;
    private readonly AutoResetEvent _taskAvailable = new AutoResetEvent(false);
    private readonly ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();
    private bool _running = false;
    private CancellationToken? Token;
 
    public bool Cancelled => Token != null && Token.Value.IsCancellationRequested;
    public bool Running => _running;
 
    public bool CheckAccess()
    {
        return Thread.CurrentThread == _initialThread;
    }
 
    public void VerifyAccess()
    {
        if (!CheckAccess())
        {
            throw new InvalidOperationException("This method can only be called on the thread that created the dispatcher.");
        }
    }
 
    public Dispatcher() : this(Thread.CurrentThread)
    {}
 
    internal Dispatcher( Thread initialThread )
    {
        _initialThread = initialThread;
        _dispatchers.GetOrAdd(initialThread, this);
    }
 
    public void Run(CancellationToken token)
    {
        VerifyAccess();
 
        if (_running) throw new InvalidOperationException("The dispatcher is already running.");
 
        _running = true;
 
        Token = token;
 
        if(!_tasks.IsEmpty) _taskAvailable.Set();
 
        try
        {
            while (!Cancelled)
            {
                if (_taskAvailable.WaitOne(100)) // Wait for a task or a cancellation request
                {
                    while (!Cancelled && _tasks.TryDequeue(out var task))
                    {
                        try
                        {
                            task.RunSynchronously();
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine($"Exception in dispatched action: {ex}");
                        }
                    }
                }
            }
        }
        finally
        {
            Token = null;
            if(!_tasks.IsEmpty) _taskAvailable.Set(); // Ensure that any pending Invoke operations complete
            _running = false;
        }
    }
 
    public Task<TResult> InvokeAsync<TResult>(Func<TResult> function)
    {
        if (function == null) throw new ArgumentNullException(nameof(function));
 
        var tcs = new TaskCompletionSource<TResult>();
 
        Action wrapperAction = () =>
        {
            try
            {
                tcs.SetResult(function());
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        };
 
        _tasks.Enqueue(new Task(wrapperAction));
        if (_running && !Cancelled) _taskAvailable.Set();
        return tcs.Task;
    }
 
    public Task InvokeAsync(Delegate action)
    {
        if (action == null) throw new ArgumentNullException(nameof(action));
 
        // Attempt to convert the delegate to a Func<object>
        if (action is Func<object> func)
        {
            return InvokeAsync(func);
        }
 
        // If the delegate is not a Func<object>, try to create a Func<object> that calls the delegate
        var tcs = new TaskCompletionSource<object>();
 
        Action wrapperAction = () =>
        {
            try
            {
                var result = action.DynamicInvoke();
                // explicitly handle Possible null reference argument
                if (result != null)
                {
                    tcs.SetResult(result);
                }
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        };
 
        _tasks.Enqueue(new Task(wrapperAction));
        if (_running && !Cancelled) _taskAvailable.Set();
        return tcs.Task;
    }
 
    public void Dispose()
    {
        while (_running)
        {
            Thread.Sleep(100);
        }
        _taskAvailable.Dispose();
    }
}
 
public static class ThreadExtensions
{
    public static Dispatcher GetDispatcher(this Thread thread)
    {
        if (thread == null) throw new ArgumentNullException(nameof(thread));
 
        Dispatcher? dispatcher = null;
         
        Dispatcher._dispatchers.TryGetValue(thread, out dispatcher);
 
        if (dispatcher == null)
        {
            dispatcher = new Dispatcher(thread);
        }
 
        return dispatcher;
    }
}