|
|
@@ -15,7 +15,7 @@ namespace Communicator
|
|
|
private readonly BufferBlock<KeyValuePair<string, object>> _changedDataItems;
|
|
|
private readonly CancellationTokenSource _cancellationTokenSource;
|
|
|
|
|
|
- private readonly ConcurrentDictionary<string,List<Action<object>>> _subscribedKeys;
|
|
|
+ private ICommunicatorProvider? _provider;
|
|
|
|
|
|
private bool disposedValue;
|
|
|
|
|
|
@@ -24,41 +24,31 @@ namespace Communicator
|
|
|
_connectionUrl = "http://localhost:9999/NewUI";
|
|
|
_hubName = "UIHub";
|
|
|
|
|
|
- _changedDataItems = new BufferBlock<KeyValuePair<string, object>>();
|
|
|
- _cancellationTokenSource = new CancellationTokenSource();
|
|
|
- _subscribedKeys = new ConcurrentDictionary<string, List<Action<object>>>();
|
|
|
- Task.Run(() =>
|
|
|
- {
|
|
|
- ProcessReceivedData();
|
|
|
- });
|
|
|
+ _changedDataItems = new();
|
|
|
+ _cancellationTokenSource = new();
|
|
|
+
|
|
|
+ Task processReceivedDataTask = new(ProcessReceivedData);
|
|
|
+ processReceivedDataTask.Start();
|
|
|
}
|
|
|
|
|
|
- public bool Initialize()
|
|
|
+ public bool Initialize(ICommunicatorProvider provider)
|
|
|
{
|
|
|
- try
|
|
|
+ ArgumentNullException.ThrowIfNull(provider, nameof(provider));
|
|
|
+ _provider = provider;
|
|
|
+
|
|
|
+ if (_hubConnection is not null)
|
|
|
{
|
|
|
- if (_hubConnection is not null)
|
|
|
- {
|
|
|
- return true;
|
|
|
- }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
|
|
|
+ try
|
|
|
+ {
|
|
|
_hubConnection = new HubConnection(_connectionUrl);
|
|
|
_hubConnection.StateChanged += OnStateChanged;
|
|
|
|
|
|
_hubProxy = _hubConnection.CreateHubProxy(_hubName);
|
|
|
- _hubProxy.On<Dictionary<string, object>>("ReceiveChangedDataItems", items =>
|
|
|
- {
|
|
|
- if(items.Count<=0)
|
|
|
- {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- foreach (var item in items)
|
|
|
- {
|
|
|
- _changedDataItems.Post(item);
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
+ _hubProxy.On<Dictionary<string, object>>(nameof(ReceiveChangedDataItems), ReceiveChangedDataItems);
|
|
|
+ _hubConnection.Start().Wait();
|
|
|
|
|
|
return true;
|
|
|
}
|
|
|
@@ -68,20 +58,16 @@ namespace Communicator
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void SubscribeDataItem(string dataKey, Action<object> callback)
|
|
|
+ private void ReceiveChangedDataItems(Dictionary<string, object> dataItems)
|
|
|
{
|
|
|
- if(!_subscribedKeys.ContainsKey(dataKey))
|
|
|
+ if (dataItems.Count <= 0)
|
|
|
{
|
|
|
- _subscribedKeys.TryAdd(dataKey, new List<Action<object>>());
|
|
|
+ return;
|
|
|
}
|
|
|
- _subscribedKeys[dataKey].Add(callback);
|
|
|
- }
|
|
|
|
|
|
- public void UnsubscribeDataItem(string dataKey, Action<object> callback)
|
|
|
- {
|
|
|
- if( _subscribedKeys.ContainsKey(dataKey))
|
|
|
+ foreach (var item in dataItems)
|
|
|
{
|
|
|
- _subscribedKeys[dataKey].Remove(callback);
|
|
|
+ _changedDataItems.Post(item);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -90,15 +76,7 @@ namespace Communicator
|
|
|
while (await _changedDataItems.OutputAvailableAsync() && !_cancellationTokenSource.IsCancellationRequested)
|
|
|
{
|
|
|
var item = await _changedDataItems.ReceiveAsync();
|
|
|
- if (!_subscribedKeys.ContainsKey(item.Key))
|
|
|
- {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- foreach (var action in _subscribedKeys[item.Key])
|
|
|
- {
|
|
|
- action?.Invoke(item.Value);
|
|
|
- }
|
|
|
+ _provider?.DataChangedNotify(item.Key, item.Value);
|
|
|
}
|
|
|
}
|
|
|
|