| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450 | 
							- using System;
 
- using System.Collections.Generic;
 
- using System.Linq;
 
- using System.Text;
 
- using Aitex.Core.Util;
 
- using Npgsql;
 
- using System.Threading;
 
- using Aitex.Core.RT.Log;
 
- using Aitex.Core.RT.Event;
 
- using Aitex.Core.RT.DBCore;
 
- using System.IO;
 
- using Aitex.Common.Util;
 
- using Aitex.Core.RT.DataCenter;
 
- using System.Collections.Concurrent;
 
- using MECF.Framework.Common.Communications;
 
- using Aitex.Core.RT.SCCore;
 
- namespace Aitex.Core.RT.DataCollection
 
- {
 
-     public class DataCollectionManager : Singleton<DataCollectionManager>, IConnection
 
-     {
 
-         public string Address
 
-         {
 
-             get { return "DB:DataCollection"; }
 
-         }
 
-         public bool IsConnected { get; set; }
 
-         NpgsqlConnection _conn;
 
-         bool _bAlive = true;
 
-         Dictionary<string, Func<object>> _preSubscribedRecordedData; //for other thread to subscribe data
 
-         Dictionary<string, Func<object>> _subscribedRecordedData; //internal use 
 
-         object _lock = new object();
 
-         F_TRIG _connFailTrig = new F_TRIG();
 
-         private int dataCollectionInterval;
 
-         IDataCollectionCallback _callback;
 
-         private string[] _modules = new string[] { "Data" };
 
-         private bool _dbFailed;
 
-         public DataCollectionManager()
 
-         {
 
-             _preSubscribedRecordedData = new Dictionary<string, Func<object>>();
 
-             _subscribedRecordedData = new Dictionary<string, Func<object>>();
 
-             dataCollectionInterval = SC.ContainsItem("System.DataCollectionInterval") ? SC.GetValue<int>("System.DataCollectionInterval") : 1000;
 
-         }
 
-         public void Initialize(IDataCollectionCallback callback)
 
-         {
 
-             _callback = callback == null ? new DefaultDataCollectionCallback() : callback;
 
-             Connect();
 
-             System.Threading.Tasks.Task.Factory.StartNew(DataRecorderThread);
 
-         }
 
-         public void Initialize(string[] modules, string dbName)
 
-         {
 
-             _callback = new DefaultDataCollectionCallback(dbName);
 
-             if (modules != null && modules.Length > 0 && !string.IsNullOrEmpty(modules[0]))
 
-                 _modules = modules;
 
-             Connect();
 
-             System.Threading.Tasks.Task.Factory.StartNew(DataRecorderThread);
 
-         }
 
-         public void Initialize(string dbName)
 
-         {
 
-             _callback = new DefaultDataCollectionCallback(dbName);
 
-             Connect();
 
-             System.Threading.Tasks.Task.Factory.StartNew(DataRecorderThread);
 
-         }
 
-         public void Terminate()
 
-         {
 
-             _bAlive = false;
 
-         }
 
-         /// <summary>
 
-         /// initialization
 
-         /// </summary>
 
-         public void SubscribeData(string dataName, string alias, Func<object> dataValue)
 
-         {
 
-             lock (_lock)
 
-             {
 
-                 if (!_preSubscribedRecordedData.Keys.Contains(dataName))
 
-                 {
 
-                     _preSubscribedRecordedData[dataName] = dataValue;
 
-                 }
 
-             }
 
-         }
 
-         private void MonitorDataCenter()
 
-         {
 
-             lock (_lock)
 
-             {
 
-                 SortedDictionary<string, Func<object>> data = DATA.GetDBRecorderList();
 
-                 foreach (var item in data)
 
-                 {
 
-                     if (!_subscribedRecordedData.ContainsKey(item.Key))
 
-                     {
 
-                         object o = item.Value.Invoke();
 
-                         if (o == null)
 
-                             continue;
 
-                         Type dataType = o.GetType();
 
-                         if (dataType == typeof(Boolean) || dataType == typeof(double) || dataType == typeof(float) ||
 
-                             dataType == typeof(int) || dataType == typeof(ushort) || dataType == typeof(short))
 
-                         {
 
-                             _subscribedRecordedData[item.Key] = item.Value;
 
-                             _preSubscribedRecordedData[item.Key] = item.Value;
 
-                         }
 
-                     }
 
-                 }
 
-             }
 
-         }
 
-         public bool Connect()
 
-         {
 
-             bool result = true;
 
-             try
 
-             {
 
-                 if (_conn != null)
 
-                     _conn.Close();
 
-                 _conn = new NpgsqlConnection(PostgresqlHelper.ConnectionString);
 
-                 _conn.Open();
 
-                 _conn.ChangeDatabase(_callback.GetDBName());
 
-                 EV.PostInfoLog("DataCollection", "Connected with database");
 
-             }
 
-             catch (Exception ex)
 
-             {
 
-                 if (_conn != null)
 
-                 {
 
-                     _conn.Close();
 
-                     _conn = null;
 
-                 }
 
-                 result = false;
 
-                 EV.PostInfoLog("DataCollection", "Can not connect database");
 
-                 LOG.Write(ex);
 
-             }
 
-             IsConnected = result;
 
-             return result;
 
-         }
 
-         public bool Disconnect()
 
-         {
 
-             try
 
-             {
 
-                 if (_conn != null)
 
-                 {
 
-                     _conn.Close();
 
-                     _conn = null;
 
-                     EV.PostInfoLog("DataCollection", "Disconnected with database");
 
-                 }
 
-             }
 
-             catch (Exception ex)
 
-             {
 
-                 LOG.Write(ex);
 
-             }
 
-             IsConnected = false;
 
-             return true;
 
-         }
 
-         /// <summary>
 
-         /// 重置数据库连接出错事件
 
-         /// </summary>
 
-         public void Reset()
 
-         {
 
-             _connFailTrig.CLK = false;
 
-         }
 
-         /// <summary>
 
-         /// data recorder thread
 
-         /// </summary>
 
-         void DataRecorderThread()
 
-         {
 
-             while (_bAlive)
 
-             {
 
-                 try
 
-                 {
 
-                    // Thread.Sleep(dataCollectionInterval);
 
-                     _connFailTrig.CLK = IsConnected;// Connect();
 
-                     if (_connFailTrig.Q)
 
-                     {
 
-                         EV.PostWarningLog("DataCollection", "Can not connect with database");
 
-                     }
 
-                     if (!_connFailTrig.M)
 
-                         continue;
 
-                     MonitorDataCenter();
 
-                     DateTime dtToday = DateTime.Now.Date;
 
-                     var moduleDataItem = new Dictionary<string, Dictionary<string, Func<object>>>();
 
-                     var moduleInsertSql = new Dictionary<string, string>();
 
-                     lock (_lock)
 
-                     {
 
-                         foreach (var module in _modules)
 
-                         {
 
-                             moduleDataItem[module] = new Dictionary<string, Func<object>>();
 
-                         }
 
-                         string defaultModule = _modules.Contains("System") ? "System" : (_modules.Contains("Data") ? "Data" : (_modules[0]));
 
-                         if (_subscribedRecordedData.Count > 0)
 
-                         {
 
-                             bool foundModule;
 
-                             foreach (var dataName in _subscribedRecordedData.Keys)
 
-                             {
 
-                                 foundModule = false;
 
-                                 foreach (var module in _modules)
 
-                                 {
 
-                                     if (dataName.StartsWith(module + "."))
 
-                                     {
 
-                                         moduleDataItem[module][dataName] = _subscribedRecordedData[dataName];
 
-                                         foundModule = true;
 
-                                         break;
 
-                                     }
 
-                                     if (dataName.StartsWith(module + "_"))
 
-                                     {
 
-                                         moduleDataItem[module][dataName] = _subscribedRecordedData[dataName];
 
-                                         foundModule = true;
 
-                                         break;
 
-                                     }
 
-                                 }
 
-                                 if (!foundModule)
 
-                                 {
 
-                                     moduleDataItem[defaultModule][dataName] = _subscribedRecordedData[dataName];
 
-                                 }
 
-                             }
 
-                             _preSubscribedRecordedData.Clear();
 
-                         }
 
-                         
 
-                         foreach (var module in _modules)
 
-                         {
 
-                             string tableName = $"{dtToday:yyyyMMdd}.{module}";
 
-                             UpdateTableSchema(tableName, moduleDataItem[module]);
 
-                             string preCreatedInsertSQL = string.Format("INSERT INTO \"{0}\"(\"time\" ", tableName);
 
-                             foreach (var dataName in moduleDataItem[module].Keys)
 
-                             {
 
-                                 preCreatedInsertSQL += string.Format(",\"{0}\"", dataName);
 
-                             }
 
-                             preCreatedInsertSQL += ")";
 
-                             moduleInsertSql[module] = preCreatedInsertSQL;
 
-                         }
 
-                     }
 
-                     //insert data into database
 
-                     StringBuilder sb = new StringBuilder(10000);
 
-                     while (_bAlive)
 
-                     {
 
-                         //Thread.Sleep(990);
 
-                         //Thread.Sleep((int)(dataCollectionInterval * 0.99)); //for time delay in sleep function
 
-                         //if alert to another day, create a new table 
 
-                         if (DateTime.Now.Date != dtToday)
 
-                             break;
 
-                         foreach (var module in _modules)
 
-                         {
 
-                             sb.Remove(0, sb.Length);
 
-                             sb.Append("Values(");
 
-                             sb.Append(DateTime.Now.Ticks.ToString());
 
-                             foreach (var dataName in moduleDataItem[module].Keys)
 
-                             {
 
-                                 sb.Append(",");
 
-                                 var v1 = moduleDataItem[module][dataName].Invoke();
 
-                                 if (v1 == null)
 
-                                 {
 
-                                     sb.Append("0");
 
-                                 }
 
-                                 else
 
-                                 {
 
-                                     if (v1 is double || v1 is float)
 
-                                     {
 
-                                         double v2 = Convert.ToDouble(v1);
 
-                                         if (double.IsNaN(v2))
 
-                                             v2 = 0;
 
-                                         sb.Append(v2.ToString());
 
-                                     }
 
-                                     else
 
-                                     {
 
-                                         sb.Append(v1.ToString());
 
-                                     }
 
-                                 }
 
-                             }
 
-                             sb.Append(");");
 
-                             NpgsqlCommand cmd = new NpgsqlCommand(moduleInsertSql[module] + sb.ToString(), _conn);
 
-                             try
 
-                             {
 
-                                 cmd.ExecuteNonQuery();
 
-                                 _dbFailed = false;
 
-                             }
 
-                             catch (Exception ex)
 
-                             {
 
-                                 if (!_dbFailed)
 
-                                 {
 
-                                     LOG.Write(ex, "数据记录发生异常" + moduleInsertSql[module]);
 
-                                     _dbFailed = true;
 
-                                 }
 
-                             }
 
-                         }
 
-                         //if preSubscribed data not empty, alert current table's structure
 
-                         MonitorDataCenter();
 
-                         if (_preSubscribedRecordedData.Count > 0)
 
-                             break;
 
-                         Thread.Sleep((int)(dataCollectionInterval * 0.99));
 
-                     }//end while
 
-                     _dbFailed = false;
 
-                 }//end while
 
-                 catch (Exception ex)
 
-                 {
 
-                     if (!_dbFailed)
 
-                     {
 
-                         LOG.Write(ex, "数据库操作记录发生异常");
 
-                         _dbFailed = true;
 
-                     }
 
-                 }
 
-             }
 
-         }
 
-         private string UpdateTableSchema(string tblName, Dictionary<string, Func<object>> dataItem)
 
-         {
 
-             //query if table already exist?
 
-             string sqlTblDefine = string.Format("select column_name from information_schema.columns where table_name = '{0}';", tblName);
 
-             NpgsqlCommand cmdTblDefine = new NpgsqlCommand(sqlTblDefine, _conn);
 
-             var tblDefineData = cmdTblDefine.ExecuteReader();
 
-             string tblAlertString = string.Empty;
 
-             List<string> colNameList = new List<string>();
 
-             while (tblDefineData.Read())
 
-             {
 
-                 for (int i = 0; i < tblDefineData.FieldCount; i++)
 
-                     colNameList.Add(tblDefineData[i].ToString());
 
-             }
 
-             tblDefineData.Close();
 
-             if (colNameList.Count > 0)
 
-             {
 
-                 //table exist
 
-                 foreach (var dataName in dataItem.Keys)
 
-                 {
 
-                     if (!colNameList.Contains(dataName))
 
-                     {
 
-                         var dataType = dataItem[dataName].Invoke().GetType();
 
-                         if (dataType == typeof(Boolean))
 
-                         {
 
-                             tblAlertString += string.Format("ALTER TABLE \"{0}\" ADD COLUMN \"{1}\" {2};", tblName, dataName, "Boolean");
 
-                         }
 
-                         else if (dataType == typeof(double) || dataType == typeof(float) || dataType == typeof(int) || dataType == typeof(ushort) || dataType == typeof(short))
 
-                         {
 
-                             tblAlertString += string.Format("ALTER TABLE \"{0}\" ADD COLUMN \"{1}\" {2};", tblName, dataName, "Real");
 
-                         }
 
-                         else
 
-                         {
 
-                         }
 
-                     }
 
-                 }
 
-                 if (!string.IsNullOrEmpty(tblAlertString))
 
-                 {
 
-                     try
 
-                     {
 
-                         NpgsqlCommand alertTblCmd = new NpgsqlCommand(tblAlertString, _conn);
 
-                         alertTblCmd.ExecuteNonQuery();
 
-                         _dbFailed = false;
 
-                     }
 
-                     catch (Exception ex)
 
-                     {
 
-                         if (!_dbFailed)
 
-                         {
 
-                             LOG.Write(ex);
 
-                             _dbFailed = true;
 
-                         }
 
-                     }
 
-                 }
 
-             }
 
-             else
 
-             {
 
-                 //table not exist, auto generate a table
 
-                 string createTble = string.Format("CREATE TABLE \"{0}\"(Time bigint NOT NULL,", tblName);
 
-                 string commentStr = "";
 
-                 foreach (var dataName in dataItem.Keys)
 
-                 {
 
-                     Type dataType = dataItem[dataName].Invoke().GetType();
 
-                     if (dataType == typeof(Boolean))
 
-                     {
 
-                         createTble += string.Format("\"{0}\" Boolean,\n", dataName);
 
-                     }
 
-                     else if (dataType == typeof(double) || dataType == typeof(float) ||
 
-                         dataType == typeof(int) || dataType == typeof(ushort) || dataType == typeof(short))
 
-                     {
 
-                         createTble += string.Format("\"{0}\" Real,\n", dataName);
 
-                     }
 
-                     //if (!string.IsNullOrEmpty(alias) && ((dataType == typeof(Boolean)) || (dataType == typeof(double) || dataType == typeof(float) ||
 
-                     //    dataType == typeof(int) || dataType == typeof(ushort) || dataType == typeof(short))))
 
-                     //    commentStr += string.Format("COMMENT ON COLUMN \"{0}\".\"{1}\" IS '{2}';\n", tblName, dataName, alias);
 
-                 }
 
-                 createTble += string.Format("CONSTRAINT \"{0}_pkey\" PRIMARY KEY (Time));", tblName);
 
-                 createTble += string.Format("GRANT SELECT ON TABLE \"{0}\" TO postgres;", tblName);
 
-                 // createTble += string.Format("CREATE INDEX \"{0}_time_idx\"  ON \"{0}\" USING btree (\"time\" );", tblName);
 
-                 createTble += commentStr;
 
-                 try
 
-                 {
 
-                     NpgsqlCommand cmd = new NpgsqlCommand(createTble.ToString(), _conn);
 
-                     cmd.ExecuteNonQuery();
 
-                     _dbFailed = false;
 
-                 }
 
-                 catch (Exception ex1)
 
-                 {
 
-                     if (!_dbFailed)
 
-                     {
 
-                         LOG.Write(ex1, "创建数据库表格失败");
 
-                         _dbFailed = true;
 
-                         //throw;
 
-                     }
 
-                 }
 
-             }
 
-             return tblName;
 
-         }
 
-     }
 
- }
 
 
  |