namespace Universal; public class EventQueue : IDisposable { public EventQueue(Action queueHandler, int timeoutMillionSecond = Timeout.Infinite, Action? timeoutHandler = null) { if (timeoutMillionSecond < -1) throw new ArgumentException($"Invalid timeout time {timeoutMillionSecond}"); this._queueHandler = queueHandler; this._timeoutHandler = timeoutHandler; this._timeoutMillionSeconds = timeoutMillionSecond; Task.Factory.StartNew(WorkingTask, TaskCreationOptions.LongRunning); } private readonly ConcurrentQueue _queue = []; private readonly Action? _queueHandler; private CancellationToken _cancellation = new(false); private readonly AutoResetEvent _queueEvent = new(false); private readonly ManualResetEvent _pauseController = new(true); private Timer? _timeoutTimer; private bool disposedValue; private readonly int _timeoutMillionSeconds; private readonly Action? _timeoutHandler; public int Count { get => this._queue.Count; } public void Pause() => this._pauseController.Reset(); public void Resume() => this._pauseController.Set(); public bool Enqueue(T input) { if (this._queue is null) return false; this._queue.Enqueue(input); this._queueEvent.Set(); return true; } private void WorkingTask() { for (T? content; !this._cancellation.IsCancellationRequested; this._pauseController.WaitOne(-1)) { if (!this._queue.TryDequeue(out content)) { this._queueEvent.Reset(); this._queueEvent.WaitOne(-1); continue; } if (!this._timeoutMillionSeconds.Equals(Timeout.Infinite)) this._timeoutTimer = new(TimeoutCallBack, content, this._timeoutMillionSeconds, int.MaxValue); try { this._queueHandler?.Invoke(content); } catch { } this._timeoutTimer?.Dispose(); } this._queue.Clear(); } private void TimeoutCallBack(object? state) { this._timeoutTimer?.Dispose(); if (state is not T content) return; this._timeoutHandler?.Invoke(content); } #region Dispose protected virtual void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { // TODO: dispose managed state (managed objects) this._cancellation = new(true); } // TODO: free unmanaged resources (unmanaged objects) and override finalizer // TODO: set large fields to null disposedValue = true; } } // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources ~EventQueue() { // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method Dispose(disposing: false); } public void Dispose() { // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method Dispose(disposing: true); GC.SuppressFinalize(this); } #endregion } public class PriorityEventQueue : IDisposable { public PriorityEventQueue(Action queueHandler, int timeoutMillionSecond = Timeout.Infinite, Action? timeoutHandler = null) { if (timeoutMillionSecond < -1) throw new ArgumentException($"Invalid timeout time {timeoutMillionSecond}"); this._queueHandler = queueHandler; this._timeoutHandler = timeoutHandler; this._timeoutMillionSeconds = timeoutMillionSecond; Task.Factory.StartNew(WorkingTask, TaskCreationOptions.LongRunning); } private readonly PriorityQueue _queue = new(); private readonly Action? _queueHandler; private CancellationToken _cancellation = new(false); private readonly AutoResetEvent _queueEvent = new(false); private readonly ManualResetEvent _pauseController = new(true); private Timer? _timeoutTimer; private bool disposedValue; private readonly int _timeoutMillionSeconds; private readonly Action? _timeoutHandler; public int Count { get => this._queue.Count; } public void Pause() => this._pauseController.Reset(); public void Resume() => this._pauseController.Set(); public bool Enqueue(T input, int priority) { if (this._queue is null) return false; this._queue.Enqueue(input, priority); this._queueEvent.Set(); return true; } private void WorkingTask() { for (T? content; !this._cancellation.IsCancellationRequested; this._pauseController.WaitOne(-1)) { try { content = this._queue.Dequeue(); } catch { this._queueEvent.Reset(); this._queueEvent.WaitOne(-1); continue; } if (!this._timeoutMillionSeconds.Equals(Timeout.Infinite)) this._timeoutTimer = new(TimeoutCallBack, content, this._timeoutMillionSeconds, int.MaxValue); try { this._queueHandler?.Invoke(content); } catch { } this._timeoutTimer?.Dispose(); } this._queue.Clear(); } private void TimeoutCallBack(object? state) { this._timeoutTimer?.Dispose(); if (state is not T content) return; this._timeoutHandler?.Invoke(content); } #region Dispose protected virtual void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { // TODO: dispose managed state (managed objects) this._cancellation = new(true); } // TODO: free unmanaged resources (unmanaged objects) and override finalizer // TODO: set large fields to null disposedValue = true; } } // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources ~PriorityEventQueue() { // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method Dispose(disposing: false); } public void Dispose() { // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method Dispose(disposing: true); GC.SuppressFinalize(this); } #endregion }