| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 | using Microsoft.AspNet.SignalR.Client;using System.Collections.Concurrent;using System.Threading;using System.Threading.Tasks.Dataflow;namespace Communicator{    public class UISignalRClient : ICommunicator    {        private string _connectionUrl;        private string _hubName;        private HubConnection? _hubConnection;        private IHubProxy? _hubProxy;        private readonly BufferBlock<KeyValuePair<string, object>> _changedDataItems;        private readonly CancellationTokenSource _cancellationTokenSource;        private readonly ConcurrentDictionary<string,List<Action<object>>> _subscribedKeys;        private bool disposedValue;        public UISignalRClient()        {            _connectionUrl = "http://localhost:9999/NewUI";            _hubName = "UIHub";            _changedDataItems = new BufferBlock<KeyValuePair<string, object>>();            _cancellationTokenSource = new CancellationTokenSource();            _subscribedKeys = new ConcurrentDictionary<string, List<Action<object>>>();            Task.Run(() =>            {                ProcessReceivedData();            });        }        public bool Initialize()        {            try            {                if (_hubConnection is not null)                {                    return true;                }                _hubConnection = new HubConnection(_connectionUrl);                _hubConnection.StateChanged += OnStateChanged;                _hubProxy = _hubConnection.CreateHubProxy(_hubName);                _hubProxy.On<Dictionary<string, object>>("ReceiveChangedDataItems", items =>                {                    if(items.Count<=0)                    {                        return;                    }                    foreach (var item in items)                    {                        _changedDataItems.Post(item);                    }                });                return true;            }            catch (Exception)            {                return false;            }        }        public void SubscribeDataItem(string dataKey, Action<object> callback)        {            if(!_subscribedKeys.ContainsKey(dataKey))            {                _subscribedKeys.TryAdd(dataKey, new List<Action<object>>());            }            _subscribedKeys[dataKey].Add(callback);        }        public void UnsubscribeDataItem(string dataKey, Action<object> callback)        {            if( _subscribedKeys.ContainsKey(dataKey))            {                _subscribedKeys[dataKey].Remove(callback);            }        }        private async void ProcessReceivedData()        {            while (await _changedDataItems.OutputAvailableAsync() && !_cancellationTokenSource.IsCancellationRequested)            {                var item = await _changedDataItems.ReceiveAsync();                if (!_subscribedKeys.ContainsKey(item.Key))                {                    continue;                }                foreach (var action in _subscribedKeys[item.Key])                {                    action?.Invoke(item.Value);                }            }        }        private void OnStateChanged(StateChange obj)        {            //TODO:        }        protected virtual void Dispose(bool disposing)        {            if (!disposedValue)            {                if (disposing)                {                    // TODO: dispose managed state (managed objects)                    if (_hubConnection is not null)                    {                        _hubConnection.StateChanged -= OnStateChanged;                        _hubConnection.Dispose();                        _changedDataItems.Complete();                        _cancellationTokenSource.Cancel();                        _cancellationTokenSource.Dispose();                    }                }                // TODO: free unmanaged resources (unmanaged objects) and override finalizer                // TODO: set large fields to null                disposedValue = true;            }        }        // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources        // ~SignalRClient()        // {        //     // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method        //     Dispose(disposing: false);        // }        public void Dispose()        {            // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method            Dispose(disposing: true);            GC.SuppressFinalize(this);        }    }}
 |