using Microsoft.AspNet.SignalR.Client; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks.Dataflow; namespace Communicator { public class UISignalRClient : ICommunicator { private string _connectionUrl; private string _hubName; private HubConnection? _hubConnection; private IHubProxy? _hubProxy; private readonly BufferBlock> _changedDataItems; private readonly CancellationTokenSource _cancellationTokenSource; private readonly ConcurrentDictionary>> _subscribedKeys; private bool disposedValue; public UISignalRClient() { _connectionUrl = "http://localhost:9999/NewUI"; _hubName = "UIHub"; _changedDataItems = new BufferBlock>(); _cancellationTokenSource = new CancellationTokenSource(); _subscribedKeys = new ConcurrentDictionary>>(); Task.Run(() => { ProcessReceivedData(); }); } public bool Initialize() { try { if (_hubConnection is not null) { return true; } _hubConnection = new HubConnection(_connectionUrl); _hubConnection.StateChanged += OnStateChanged; _hubProxy = _hubConnection.CreateHubProxy(_hubName); _hubProxy.On>("ReceiveChangedDataItems", items => { if(items.Count<=0) { return; } foreach (var item in items) { _changedDataItems.Post(item); } }); return true; } catch (Exception) { return false; } } public void SubscribeDataItem(string dataKey, Action callback) { if(!_subscribedKeys.ContainsKey(dataKey)) { _subscribedKeys.TryAdd(dataKey, new List>()); } _subscribedKeys[dataKey].Add(callback); } public void UnsubscribeDataItem(string dataKey, Action callback) { if( _subscribedKeys.ContainsKey(dataKey)) { _subscribedKeys[dataKey].Remove(callback); } } private async void ProcessReceivedData() { while (await _changedDataItems.OutputAvailableAsync() && !_cancellationTokenSource.IsCancellationRequested) { var item = await _changedDataItems.ReceiveAsync(); if (!_subscribedKeys.ContainsKey(item.Key)) { continue; } foreach (var action in _subscribedKeys[item.Key]) { action?.Invoke(item.Value); } } } private void OnStateChanged(StateChange obj) { //TODO: } protected virtual void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { // TODO: dispose managed state (managed objects) if (_hubConnection is not null) { _hubConnection.StateChanged -= OnStateChanged; _hubConnection.Dispose(); _changedDataItems.Complete(); _cancellationTokenSource.Cancel(); _cancellationTokenSource.Dispose(); } } // TODO: free unmanaged resources (unmanaged objects) and override finalizer // TODO: set large fields to null disposedValue = true; } } // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources // ~SignalRClient() // { // // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method // Dispose(disposing: false); // } public void Dispose() { // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method Dispose(disposing: true); GC.SuppressFinalize(this); } } }