using System; using System.Collections.Generic; using System.Linq; using System.Text; using Aitex.Core.Util; using Npgsql; using System.Threading; using Aitex.Core.RT.Log; using Aitex.Core.RT.Event; using Aitex.Core.RT.DBCore; using System.IO; using Aitex.Common.Util; using Aitex.Core.RT.DataCenter; using System.Collections.Concurrent; namespace Aitex.Core.RT.DataCollection { public class DataCollectionManager : Singleton { NpgsqlConnection _conn; bool _bAlive = true; Dictionary>> _preSubscribedRecordedData; //for other thread to subscribe data Dictionary>> _subscribedRecordedData; //internal use Dictionary>> _subscribedDataList; //internal use for data service function object _lock = new object(); object _lock2 = new object(); R_TRIG _connFailTrig = new R_TRIG(); IDataCollectionCallback _callback; public void Initialize(IDataCollectionCallback callback) { _callback = callback == null ? new DefaultDataCollectionCallback() : callback; //PrepareDatabaseTable(); _preSubscribedRecordedData = new Dictionary>>(); _subscribedRecordedData = new Dictionary>>(); _subscribedDataList = new Dictionary>>(); System.Threading.Tasks.Task.Factory.StartNew(DataRecorderThread); } public void Terminate() { _bAlive = false; } /// /// 检查数据库表结构是否需要更新、新建、修改操作 /// 新建RecipeRunHistory表格,为了将之前的工艺历史数据导入到新表中,在此处执行对应的SQL更新语句 /// // void PrepareDatabaseTable() //{ // string sqlFile = _callback.GetSqlUpdateFile(); // if (string.IsNullOrEmpty(sqlFile) || !File.Exists(sqlFile)) // { // LOG.Info("没有更新Sql数据库,文件:"+sqlFile); // return; // } // using (StreamReader fs = new System.IO.StreamReader(sqlFile )) // { // string sql = fs.ReadToEnd(); // PostgresqlHelper.ExecuteNonQuery(sql); // LOG.Write("完成对数据库表格的更新操作"); // } //} /// /// Get subscribed data value by data name /// /// /// public object GetSubscribedDataValue(string dataName) { try { if (_subscribedDataList.ContainsKey(dataName)) return _subscribedDataList[dataName].Value.Invoke(); } catch (Exception ex) { LOG.Write(ex); } return null; } /// /// 获取注册的变量名 /// /// public IEnumerable GetDataVariableNames() { return _subscribedDataList.Keys.ToList(); } /// /// initialization /// public void SubscribeData(string dataName, string alias, Func dataValue) { lock (_lock) { if (!_preSubscribedRecordedData.Keys.Contains(dataName)) { _preSubscribedRecordedData.Add(dataName, new KeyValuePair>(alias, dataValue)); } } lock (_lock2) { if (!_subscribedDataList.Keys.Contains(dataName)) { _subscribedDataList.Add(dataName, new KeyValuePair>(alias, dataValue)); } } } void MonitorDataCenter() { lock (_lock2) { SortedDictionary> data = DATA.GetDBRecorderList(); foreach (var item in data) { if (!_subscribedDataList.ContainsKey(item.Key)) { object o = item.Value.Invoke(); if (o == null) continue; Type dataType = o.GetType(); if (dataType == typeof(Boolean) || dataType == typeof(double) || dataType == typeof(float) || dataType == typeof(int) || dataType == typeof(ushort) || dataType == typeof(short)) SubscribeData(item.Key, item.Key, item.Value); } } } } /// /// open database connection /// /// bool DBOpen() { bool result = true; try { if (_conn != null) _conn.Close(); _conn = new NpgsqlConnection(PostgresqlHelper.ConnectionString); _conn.Open(); _conn.ChangeDatabase(_callback.GetDBName()); } catch (Exception ex) { if (_conn != null) { _conn.Close(); _conn = null; } result = false; LOG.Write(ex); } return result; } /// /// 重置数据库连接出错事件 /// public void Reset() { _connFailTrig.CLK = false; } /// /// data recorder thread /// void DataRecorderThread() { while (_bAlive) { try { Thread.Sleep(1000); var ret = DBOpen(); //数据连接失败事件 触发器 _connFailTrig.CLK = !ret; if (_connFailTrig.Q) _callback.PostDBFailedEvent(); if (!ret) continue; //create one database table for each chamber for each day var today = DateTime.Now.Date; var now = DateTime.Now; MonitorDataCenter(); if (_preSubscribedRecordedData.Count > 0) { lock (_lock) { foreach (var dataId in _preSubscribedRecordedData.Keys) { if (!_subscribedRecordedData.Keys.Contains(dataId)) _subscribedRecordedData.Add(dataId, _preSubscribedRecordedData[dataId]); } _preSubscribedRecordedData.Clear(); } } //pre-create insert sql sentence string insertSql = ""; var tblName = string.Format("{0}.{1}", today.ToString("yyyyMMdd"), _callback.GetDataTablePrefix()); //query if table already exist? string sqlTblDefine = string.Format("select column_name from information_schema.columns where table_name = '{0}';", tblName); NpgsqlCommand cmdTblDefine = new NpgsqlCommand(sqlTblDefine, _conn); var tblDefineData = cmdTblDefine.ExecuteReader(); string tblAlertString = string.Empty; List colNameList = new List(); while (tblDefineData.Read()) { for (int i = 0; i < tblDefineData.FieldCount; i++) colNameList.Add(tblDefineData[i].ToString()); } tblDefineData.Close(); if (colNameList.Count > 0) { //table exist foreach (var dataName in _subscribedRecordedData.Keys) { if (!colNameList.Contains(dataName)) { var dataType = _subscribedRecordedData[dataName].Value.Invoke().GetType(); if (dataType == typeof(Boolean)) { tblAlertString += string.Format("ALTER TABLE \"{0}\" ADD COLUMN \"{1}\" {2};", tblName, dataName, "Boolean"); } else if (dataType == typeof(double) || dataType == typeof(float) || dataType == typeof(int) || dataType == typeof(Int16)) { tblAlertString += string.Format("ALTER TABLE \"{0}\" ADD COLUMN \"{1}\" {2};", tblName, dataName, "Real"); } else { } } } if (!string.IsNullOrEmpty(tblAlertString)) { try { NpgsqlCommand alertTblCmd = new NpgsqlCommand(tblAlertString, _conn); alertTblCmd.ExecuteNonQuery(); } catch (Exception ex) { LOG.Write(ex); throw; } } } else { //table not exist, auto generate a table string createTble = string.Format("CREATE TABLE \"{0}\"(Time bigint NOT NULL,", tblName); string commentStr = ""; foreach (var dataName in _subscribedRecordedData.Keys) { Type dataType = _subscribedRecordedData[dataName].Value.Invoke().GetType(); string alias = _subscribedRecordedData[dataName].Key; if (dataType == typeof(Boolean)) { createTble += string.Format("\"{0}\" Boolean,\n", dataName); } else if (dataType == typeof(double) || dataType == typeof(float) || dataType == typeof(int) || dataType == typeof(ushort) || dataType == typeof(short)) { createTble += string.Format("\"{0}\" Real,\n", dataName); } if (!string.IsNullOrEmpty(alias) && ((dataType == typeof(Boolean)) || (dataType == typeof(double) || dataType == typeof(float) || dataType == typeof(int) || dataType == typeof(ushort) || dataType == typeof(short)))) commentStr += string.Format("COMMENT ON COLUMN \"{0}\".\"{1}\" IS '{2}';\n", tblName, dataName, alias); } createTble += string.Format("CONSTRAINT \"{0}_pkey\" PRIMARY KEY (Time));", tblName); createTble += string.Format("GRANT SELECT ON TABLE \"{0}\" TO postgres;", tblName); // createTble += string.Format("CREATE INDEX \"{0}_time_idx\" ON \"{0}\" USING btree (\"time\" );", tblName); createTble += commentStr; try { NpgsqlCommand cmd = new NpgsqlCommand(createTble.ToString(), _conn); cmd.ExecuteNonQuery(); } catch (Exception ex1) { LOG.Write(ex1, "创建数据库表格失败"); throw; } } //pre-create insert SQL sentence string preCreatedInsertSQL = string.Format("INSERT INTO \"{0}\"(\"time\" ", tblName); foreach (var dataId in _subscribedRecordedData.Keys) { preCreatedInsertSQL += string.Format(",\"{0}\"", dataId); } preCreatedInsertSQL += ")"; insertSql = preCreatedInsertSQL; //insert data into database StringBuilder sb = new StringBuilder(10000); while (_bAlive) { Thread.Sleep(990); //for time delay in sleep function sb.Remove(0, sb.Length); sb.Append("Values("); sb.Append(DateTime.Now.Ticks.ToString()); foreach (var dataName in _subscribedRecordedData.Keys) { sb.Append(","); var v1 = _subscribedRecordedData[dataName].Value.Invoke(); if (v1 == null) { sb.Append("0"); } else { if (v1.GetType() == typeof(double) || v1.GetType() == typeof(float)) { double v2 = Convert.ToDouble(v1); //fixed SMART data exception if (double.IsNaN(v2)) v2 = 0; sb.Append(v2.ToString()); } else { sb.Append(v1.ToString()); } } } sb.Append(");"); NpgsqlCommand cmd = new NpgsqlCommand(insertSql + sb.ToString(), _conn); try { cmd.ExecuteNonQuery(); } catch (Exception ex) { LOG.Write(ex, "数据记录发生异常"+insertSql); throw; } //if alert to another day, create a new table if (DateTime.Now.Date != today) break; //if preSubscribed data not empty, alert current table's structure if (_preSubscribedRecordedData.Count > 0) break; }//end while }//end while catch (Exception ex) { LOG.Write(ex, "数据库操作记录发生异常"); EV.PostWarningLog("Database", "Database record failed,"+ex.Message); break; } } } } }