123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- namespace Universal;
- public class EventQueue<T> : IDisposable
- {
- public EventQueue(Action<T> queueHandler, int timeoutMillionSecond = Timeout.Infinite, Action<T>? timeoutHandler = null)
- {
- if (timeoutMillionSecond < -1)
- throw new ArgumentException($"Invalid timeout time {timeoutMillionSecond}");
- this._queueHandler = queueHandler;
- this._timeoutHandler = timeoutHandler;
- this._timeoutMillionSeconds = timeoutMillionSecond;
- Task.Factory.StartNew(WorkingTask, TaskCreationOptions.LongRunning);
- }
- private readonly ConcurrentQueue<T> _queue = [];
- private readonly Action<T>? _queueHandler;
- private CancellationToken _cancellation = new(false);
- private readonly AutoResetEvent _queueEvent = new(false);
- private readonly ManualResetEvent _pauseController = new(true);
- private Timer? _timeoutTimer;
- private bool disposedValue;
- private readonly int _timeoutMillionSeconds;
- private readonly Action<T>? _timeoutHandler;
- public int Count { get => this._queue.Count; }
- public void Pause() => this._pauseController.Reset();
- public void Resume() => this._pauseController.Set();
- public bool Enqueue(T input)
- {
- if (this._queue is null)
- return false;
- this._queue.Enqueue(input);
- this._queueEvent.Set();
- return true;
- }
- private void WorkingTask()
- {
- for (T? content; !this._cancellation.IsCancellationRequested; this._pauseController.WaitOne(-1))
- {
- if (!this._queue.TryDequeue(out content))
- {
- this._queueEvent.Reset();
- this._queueEvent.WaitOne(-1);
- continue;
- }
- if (!this._timeoutMillionSeconds.Equals(Timeout.Infinite))
- this._timeoutTimer = new(TimeoutCallBack, content, this._timeoutMillionSeconds, int.MaxValue);
- try
- {
- this._queueHandler?.Invoke(content);
- }
- catch
- {
- }
- this._timeoutTimer?.Dispose();
- }
- this._queue.Clear();
- }
- private void TimeoutCallBack(object? state)
- {
- this._timeoutTimer?.Dispose();
- if (state is not T content)
- return;
- this._timeoutHandler?.Invoke(content);
- }
- #region Dispose
- protected virtual void Dispose(bool disposing)
- {
- if (!disposedValue)
- {
- if (disposing)
- {
- // TODO: dispose managed state (managed objects)
- this._cancellation = new(true);
- }
- // TODO: free unmanaged resources (unmanaged objects) and override finalizer
- // TODO: set large fields to null
- disposedValue = true;
- }
- }
- // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources
- ~EventQueue()
- {
- // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
- Dispose(disposing: false);
- }
- public void Dispose()
- {
- // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
- Dispose(disposing: true);
- GC.SuppressFinalize(this);
- }
- #endregion
- }
- public class PriorityEventQueue<T> : IDisposable
- {
- public PriorityEventQueue(Action<T> queueHandler, int timeoutMillionSecond = Timeout.Infinite, Action<T>? timeoutHandler = null)
- {
- if (timeoutMillionSecond < -1)
- throw new ArgumentException($"Invalid timeout time {timeoutMillionSecond}");
- this._queueHandler = queueHandler;
- this._timeoutHandler = timeoutHandler;
- this._timeoutMillionSeconds = timeoutMillionSecond;
- Task.Factory.StartNew(WorkingTask, TaskCreationOptions.LongRunning);
- }
- private readonly PriorityQueue<T, int> _queue = new();
- private readonly Action<T>? _queueHandler;
- private CancellationToken _cancellation = new(false);
- private readonly AutoResetEvent _queueEvent = new(false);
- private readonly ManualResetEvent _pauseController = new(true);
- private Timer? _timeoutTimer;
- private bool disposedValue;
- private readonly int _timeoutMillionSeconds;
- private readonly Action<T>? _timeoutHandler;
- public int Count { get => this._queue.Count; }
- public void Pause() => this._pauseController.Reset();
- public void Resume() => this._pauseController.Set();
- public bool Enqueue(T input, int priority)
- {
- if (this._queue is null)
- return false;
- this._queue.Enqueue(input, priority);
- this._queueEvent.Set();
- return true;
- }
- private void WorkingTask()
- {
- for (T? content; !this._cancellation.IsCancellationRequested; this._pauseController.WaitOne(-1))
- {
- try
- {
- content = this._queue.Dequeue();
- }
- catch
- {
- this._queueEvent.Reset();
- this._queueEvent.WaitOne(-1);
- continue;
- }
- if (!this._timeoutMillionSeconds.Equals(Timeout.Infinite))
- this._timeoutTimer = new(TimeoutCallBack, content, this._timeoutMillionSeconds, int.MaxValue);
- try
- {
- this._queueHandler?.Invoke(content);
- }
- catch
- {
- }
- this._timeoutTimer?.Dispose();
- }
- this._queue.Clear();
- }
- private void TimeoutCallBack(object? state)
- {
- this._timeoutTimer?.Dispose();
- if (state is not T content)
- return;
- this._timeoutHandler?.Invoke(content);
- }
- #region Dispose
- protected virtual void Dispose(bool disposing)
- {
- if (!disposedValue)
- {
- if (disposing)
- {
- // TODO: dispose managed state (managed objects)
- this._cancellation = new(true);
- }
- // TODO: free unmanaged resources (unmanaged objects) and override finalizer
- // TODO: set large fields to null
- disposedValue = true;
- }
- }
- // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources
- ~PriorityEventQueue()
- {
- // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
- Dispose(disposing: false);
- }
- public void Dispose()
- {
- // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
- Dispose(disposing: true);
- GC.SuppressFinalize(this);
- }
- #endregion
- }
|