Browse Source

Merge branch 'master' of http://git.jetplasma-oa.com/Jet/ApolloNewWorld

Zixuan 1 week ago
parent
commit
b7d5455507

+ 1 - 3
FurnaceNewWorld/Communication/Communicator/ICommunicator.cs

@@ -2,9 +2,7 @@
 
 public interface ICommunicator : IDisposable
 {
-    bool Initialize(ICommunicatorProvider communicatorProvider);
-    void SubscribeDataItem(string dataKey, Action<object> callback);
-    void UnsubscribeDataItem(string dataKey, Action<object> callback);
+    bool Initialize(ICommunicatorProvider provider);
 }
 
 public interface ICommunicatorProvider

+ 16 - 0
FurnaceNewWorld/Communication/Communicator/MockCommunicatorProvider.cs

@@ -0,0 +1,16 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Communicator
+{
+    public class MockCommunicatorProvider : ICommunicatorProvider
+    {
+        void ICommunicatorProvider.DataChangedNotify(string dataKey, object rawData)
+        {
+            //TODO:
+        }
+    }
+}

+ 23 - 45
FurnaceNewWorld/Communication/Communicator/UISignalRClient.cs

@@ -15,7 +15,7 @@ namespace Communicator
         private readonly BufferBlock<KeyValuePair<string, object>> _changedDataItems;
         private readonly CancellationTokenSource _cancellationTokenSource;
 
-        private readonly ConcurrentDictionary<string,List<Action<object>>> _subscribedKeys;
+        private ICommunicatorProvider? _provider;
 
         private bool disposedValue;
 
@@ -24,41 +24,31 @@ namespace Communicator
             _connectionUrl = "http://localhost:9999/NewUI";
             _hubName = "UIHub";
 
-            _changedDataItems = new BufferBlock<KeyValuePair<string, object>>();
-            _cancellationTokenSource = new CancellationTokenSource();
-            _subscribedKeys = new ConcurrentDictionary<string, List<Action<object>>>();
-            Task.Run(() =>
-            {
-                ProcessReceivedData();
-            });
+            _changedDataItems = new();
+            _cancellationTokenSource = new();
+
+            Task processReceivedDataTask = new(ProcessReceivedData);
+            processReceivedDataTask.Start();
         }
 
-        public bool Initialize()
+        public bool Initialize(ICommunicatorProvider provider)
         {
-            try
+            ArgumentNullException.ThrowIfNull(provider, nameof(provider));
+            _provider = provider;
+
+            if (_hubConnection is not null)
             {
-                if (_hubConnection is not null)
-                {
-                    return true;
-                }
+                return true;
+            }
 
+            try
+            {
                 _hubConnection = new HubConnection(_connectionUrl);
                 _hubConnection.StateChanged += OnStateChanged;
 
                 _hubProxy = _hubConnection.CreateHubProxy(_hubName);
-                _hubProxy.On<Dictionary<string, object>>("ReceiveChangedDataItems", items =>
-                {
-                    if(items.Count<=0)
-                    {
-                        return;
-                    }
-
-                    foreach (var item in items)
-                    {
-                        _changedDataItems.Post(item);
-                    }
-                });
-
+                _hubProxy.On<Dictionary<string, object>>(nameof(ReceiveChangedDataItems), ReceiveChangedDataItems);
+                _hubConnection.Start().Wait();
 
                 return true;
             }
@@ -68,20 +58,16 @@ namespace Communicator
             }
         }
 
-        public void SubscribeDataItem(string dataKey, Action<object> callback)
+        private void ReceiveChangedDataItems(Dictionary<string, object> dataItems)
         {
-            if(!_subscribedKeys.ContainsKey(dataKey))
+            if (dataItems.Count <= 0)
             {
-                _subscribedKeys.TryAdd(dataKey, new List<Action<object>>());
+                return;
             }
-            _subscribedKeys[dataKey].Add(callback);
-        }
 
-        public void UnsubscribeDataItem(string dataKey, Action<object> callback)
-        {
-            if( _subscribedKeys.ContainsKey(dataKey))
+            foreach (var item in dataItems)
             {
-                _subscribedKeys[dataKey].Remove(callback);
+                _changedDataItems.Post(item);
             }
         }
 
@@ -90,15 +76,7 @@ namespace Communicator
             while (await _changedDataItems.OutputAvailableAsync() && !_cancellationTokenSource.IsCancellationRequested)
             {
                 var item = await _changedDataItems.ReceiveAsync();
-                if (!_subscribedKeys.ContainsKey(item.Key))
-                {
-                    continue;
-                }
-
-                foreach (var action in _subscribedKeys[item.Key])
-                {
-                    action?.Invoke(item.Value);
-                }
+                _provider?.DataChangedNotify(item.Key, item.Value);
             }
         }