| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 | namespace Universal;public class EventQueue<T> : IDisposable{    public EventQueue(Action<T> queueHandler, int timeoutMillionSecond = Timeout.Infinite, Action<T>? timeoutHandler = null)    {        if (timeoutMillionSecond < -1)            throw new ArgumentException($"Invalid timeout time {timeoutMillionSecond}");        this._queueHandler = queueHandler;        this._timeoutHandler = timeoutHandler;        this._timeoutMillionSeconds = timeoutMillionSecond;        Task.Factory.StartNew(WorkingTask, TaskCreationOptions.LongRunning);    }    private readonly ConcurrentQueue<T> _queue = [];    private readonly Action<T>? _queueHandler;    private CancellationToken _cancellation = new(false);    private readonly AutoResetEvent _queueEvent = new(false);    private readonly ManualResetEvent _pauseController = new(true);    private Timer? _timeoutTimer;    private bool disposedValue;    private readonly int _timeoutMillionSeconds;    private readonly Action<T>? _timeoutHandler;    public T? LastItem { get; private set; }    public int Count { get => this._queue.Count; }    public void Pause() => this._pauseController.Reset();    public void Resume() => this._pauseController.Set();    public bool Enqueue(T input)    {        if (this._queue is null)            return false;        this.LastItem = input;        this._queue.Enqueue(input);        this._queueEvent.Set();        return true;    }    private void WorkingTask()    {        for (T? content; !this._cancellation.IsCancellationRequested; this._pauseController.WaitOne(-1))        {            if (!this._queue.TryDequeue(out content))            {                this._queueEvent.Reset();                this._queueEvent.WaitOne(-1);                continue;            }            if (!this._timeoutMillionSeconds.Equals(Timeout.Infinite))                this._timeoutTimer = new(TimeoutCallBack, content, this._timeoutMillionSeconds, int.MaxValue);            try            {                this._queueHandler?.Invoke(content);            }            catch            {            }                  this._timeoutTimer?.Dispose();        }        this._queue.Clear();    }    private void TimeoutCallBack(object? state)    {        this._timeoutTimer?.Dispose();        if (state is not T content)            return;        this._timeoutHandler?.Invoke(content);    }    #region Dispose    protected virtual void Dispose(bool disposing)    {        if (!disposedValue)        {            if (disposing)            {                // TODO: dispose managed state (managed objects)                this._cancellation = new(true);            }            // TODO: free unmanaged resources (unmanaged objects) and override finalizer            // TODO: set large fields to null            disposedValue = true;        }    }    // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources    ~EventQueue()    {        // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method        Dispose(disposing: false);    }    public void Dispose()    {        // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method        Dispose(disposing: true);        GC.SuppressFinalize(this);    }    #endregion}
 |