| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 | using System;using System.Collections.Generic;using System.Linq;using System.Text;using Aitex.Core.Util;using Npgsql;using System.Threading;using Aitex.Core.RT.Log;using Aitex.Core.RT.Event;using Aitex.Core.RT.DBCore;using System.IO;using Aitex.Common.Util;using Aitex.Core.RT.DataCenter;using System.Collections.Concurrent;using MECF.Framework.Common.Communications;using Aitex.Core.RT.SCCore;namespace Aitex.Core.RT.DataCollection{    public class DataCollectionManager : Singleton<DataCollectionManager>, IConnection    {        public string Address        {            get { return "DB:DataCollection"; }        }        public bool IsConnected { get; set; }        NpgsqlConnection _conn;         bool _bAlive = true;         Dictionary<string, Func<object>> _preSubscribedRecordedData; //for other thread to subscribe data         Dictionary<string, Func<object>> _subscribedRecordedData; //internal use          object _lock = new object();        F_TRIG _connFailTrig = new F_TRIG();        private int dataCollectionInterval;         IDataCollectionCallback _callback;        private string[] _modules = new string[] {"Data"};        public DataCollectionManager()        {            _preSubscribedRecordedData = new Dictionary<string, Func<object>>();            _subscribedRecordedData = new Dictionary<string, Func<object>>();            dataCollectionInterval = SC.ContainsItem("System.DataCollectionInterval") ? SC.GetValue<int>("System.DataCollectionInterval") : 1000;        }        public  void Initialize(IDataCollectionCallback callback)        {            _callback = callback == null ? new DefaultDataCollectionCallback() : callback;            Connect();            System.Threading.Tasks.Task.Factory.StartNew(DataRecorderThread);        }        public void Initialize(string[] modules, string dbName)        {            _callback = new DefaultDataCollectionCallback(dbName);            if (modules!=null && modules.Length>0 && !string.IsNullOrEmpty(modules[0]))                _modules = modules;            Connect();            System.Threading.Tasks.Task.Factory.StartNew(DataRecorderThread);        }        public void Initialize(string dbName)        {            _callback =  new DefaultDataCollectionCallback(dbName)  ;            Connect();            System.Threading.Tasks.Task.Factory.StartNew(DataRecorderThread);        }         public void Terminate()        {            _bAlive = false;        }        /// <summary>        /// initialization        /// </summary>        public  void SubscribeData(string dataName, string alias, Func<object> dataValue)        {            lock (_lock)            {                if (!_preSubscribedRecordedData.Keys.Contains(dataName))                {                    _preSubscribedRecordedData[dataName] = dataValue;                }            }        }        private void MonitorDataCenter()        {            lock (_lock)            {                SortedDictionary<string, Func<object>> data = DATA.GetDBRecorderList();                foreach (var item in data)                {                    if (!_subscribedRecordedData.ContainsKey(item.Key))                    {                        object o = item.Value.Invoke();                        if (o == null)                            continue;                        Type dataType = o.GetType();                        if (dataType == typeof(Boolean) || dataType == typeof(double) || dataType == typeof(float) ||                            dataType == typeof(int) || dataType == typeof(ushort) || dataType == typeof(short))                        {                            _subscribedRecordedData[item.Key] = item.Value;                            _preSubscribedRecordedData[item.Key] = item.Value;                        }                    }                }            }        }        public bool Connect()        {            bool result = true;            try            {                if (_conn != null)                    _conn.Close();                _conn = new NpgsqlConnection(PostgresqlHelper.ConnectionString);                _conn.Open();                _conn.ChangeDatabase(_callback.GetDBName());                EV.PostInfoLog("DataCollection", "Connected with database");            }            catch (Exception ex)            {                if (_conn != null)                {                    _conn.Close();                    _conn = null;                }                result = false;                EV.PostInfoLog("DataCollection", "Can not connect database");                LOG.WriteExeption(ex);            }            IsConnected = result;            return result;        }        public bool Disconnect()        {            try            {                if (_conn != null)                {                    _conn.Close();                    _conn = null;                    EV.PostInfoLog("DataCollection", "Disconnected with database");                }            }            catch (Exception ex)            {                LOG.WriteExeption(ex);            }            IsConnected = false;            return true;        }         /// <summary>        /// 重置数据库连接出错事件        /// </summary>        public  void Reset()        {            _connFailTrig.CLK = false;        }        /// <summary>        /// data recorder thread        /// </summary>         void DataRecorderThread()        {            while (_bAlive)            {                try                {                    Thread.Sleep(dataCollectionInterval);                    _connFailTrig.CLK = IsConnected;// Connect();                    if (_connFailTrig.Q)                    {                        EV.PostWarningLog("DataCollection", "Can not connect with database");                    }                    if (!_connFailTrig.M)                        continue;                     MonitorDataCenter();                    var moduleDataItem = new Dictionary<string, Dictionary<string, Func<object>>>();                    var moduleInsertSql = new Dictionary<string, string>();                    foreach (var module in _modules)                    {                        moduleDataItem[module] = new Dictionary<string, Func<object>>();                    }                    string defaultModule = _modules.Contains("System") ? "System" : (_modules.Contains("Data") ? "Data" : (_modules[0]));                    if (_subscribedRecordedData.Count > 0)                    {                        lock (_lock)                        {                            bool foundModule;                            foreach (var dataName in _subscribedRecordedData.Keys)                            {                                foundModule = false;                                foreach (var module in _modules)                                {                                    if (dataName.StartsWith(module+"."))                                    {                                        moduleDataItem[module][dataName] = _subscribedRecordedData[dataName];                                        foundModule = true;                                        break;                                    }                                }                                if (!foundModule)                                {                                    moduleDataItem[defaultModule][dataName] = _subscribedRecordedData[dataName];                                }                            }                            _preSubscribedRecordedData.Clear();                        }                    }                    DateTime dtToday = DateTime.Now.Date;                    foreach (var module in _modules)                    {                        string tableName = $"{dtToday:yyyyMMdd}.{module}";                        UpdateTableSchema(tableName, moduleDataItem[module]);                        string preCreatedInsertSQL = string.Format("INSERT INTO \"{0}\"(\"time\" ", tableName);                        foreach (var dataName in moduleDataItem[module].Keys)                        {                            preCreatedInsertSQL += string.Format(",\"{0}\"", dataName);                        }                        preCreatedInsertSQL += ")";                        moduleInsertSql[module] = preCreatedInsertSQL;                    }                    //insert data into database                    StringBuilder sb = new StringBuilder(10000);                    while (_bAlive)                    {                        //Thread.Sleep(990);                        Thread.Sleep((int)(dataCollectionInterval * 0.99)); //for time delay in sleep function                        foreach (var module in _modules)                        {                            sb.Remove(0, sb.Length);                            sb.Append("Values(");                            sb.Append(DateTime.Now.Ticks.ToString());                            foreach (var dataName in moduleDataItem[module].Keys)                            {                                sb.Append(",");                                var v1 = moduleDataItem[module][dataName].Invoke();                                if (v1 == null)                                {                                    sb.Append("0");                                }                                else                                {                                    if (v1 is double || v1 is float)                                    {                                        double v2 = Convert.ToDouble(v1);                                        if (double.IsNaN(v2))                                            v2 = 0;                                        sb.Append(v2.ToString());                                    }                                    else                                    {                                        sb.Append(v1.ToString());                                    }                                }                            }                            sb.Append(");");                            NpgsqlCommand cmd = new NpgsqlCommand(moduleInsertSql[module] + sb.ToString(), _conn);                            try                            {                                cmd.ExecuteNonQuery();                            }                            catch (Exception ex)                            {                                LOG.WriteExeption("数据记录发生异常" + moduleInsertSql[module], ex);                            }                        }                        //if alert to another day, create a new table                         if (DateTime.Now.Date != dtToday)                            break;                        //if preSubscribed data not empty, alert current table's structure                        MonitorDataCenter();                        if (_preSubscribedRecordedData.Count > 0)                            break;                    }//end while                }//end while                catch (Exception ex)                {                    LOG.WriteExeption("数据库操作记录发生异常", ex);                }            }        }        private string UpdateTableSchema(string tblName, Dictionary<string, Func<object>> dataItem)        {            //query if table already exist?            string sqlTblDefine = string.Format("select column_name from information_schema.columns where table_name = '{0}';", tblName);            NpgsqlCommand cmdTblDefine = new NpgsqlCommand(sqlTblDefine, _conn);            var tblDefineData = cmdTblDefine.ExecuteReader();            string tblAlertString = string.Empty;            List<string> colNameList = new List<string>();            while (tblDefineData.Read())            {                for (int i = 0; i < tblDefineData.FieldCount; i++)                    colNameList.Add(tblDefineData[i].ToString());            }            tblDefineData.Close();            if (colNameList.Count > 0)            {                //table exist                foreach (var dataName in dataItem.Keys)                {                    if (!colNameList.Contains(dataName) )                    {                        var dataType = dataItem[dataName].Invoke().GetType();                        if (dataType == typeof(Boolean))                        {                            tblAlertString += string.Format("ALTER TABLE \"{0}\" ADD COLUMN \"{1}\" {2};", tblName, dataName, "Boolean");                        }                        else if (dataType == typeof(double) || dataType == typeof(float) || dataType == typeof(int) || dataType == typeof(ushort) || dataType==typeof(short))                        {                            tblAlertString += string.Format("ALTER TABLE \"{0}\" ADD COLUMN \"{1}\" {2};", tblName, dataName, "Real");                        }                        else                        {                        }                    }                }                if (!string.IsNullOrEmpty(tblAlertString))                {                    try                    {                        NpgsqlCommand alertTblCmd = new NpgsqlCommand(tblAlertString, _conn);                        alertTblCmd.ExecuteNonQuery();                    }                    catch (Exception ex)                    {                        LOG.WriteExeption(ex);                    }                }            }            else            {                //table not exist, auto generate a table                string createTble = string.Format("CREATE TABLE \"{0}\"(Time bigint NOT NULL,", tblName);                string commentStr = "";                foreach (var dataName in dataItem.Keys)                {                    Type dataType = dataItem[dataName].Invoke().GetType();                    if (dataType == typeof(Boolean))                    {                        createTble += string.Format("\"{0}\" Boolean,\n", dataName);                    }                    else if (dataType == typeof(double) || dataType == typeof(float) ||                        dataType == typeof(int) || dataType == typeof(ushort) || dataType == typeof(short))                    {                        createTble += string.Format("\"{0}\" Real,\n", dataName);                    }                    //if (!string.IsNullOrEmpty(alias) && ((dataType == typeof(Boolean)) || (dataType == typeof(double) || dataType == typeof(float) ||                    //    dataType == typeof(int) || dataType == typeof(ushort) || dataType == typeof(short))))                    //    commentStr += string.Format("COMMENT ON COLUMN \"{0}\".\"{1}\" IS '{2}';\n", tblName, dataName, alias);                }                createTble += string.Format("CONSTRAINT \"{0}_pkey\" PRIMARY KEY (Time));", tblName);                createTble += string.Format("GRANT SELECT ON TABLE \"{0}\" TO postgres;", tblName);                // createTble += string.Format("CREATE INDEX \"{0}_time_idx\"  ON \"{0}\" USING btree (\"time\" );", tblName);                createTble += commentStr;                try                {                    NpgsqlCommand cmd = new NpgsqlCommand(createTble.ToString(), _conn);                    cmd.ExecuteNonQuery();                }                catch (Exception ex1)                {                    LOG.WriteExeption("创建数据库表格失败", ex1);                    throw;                }            }            return tblName;        }    }}
 |