DataCollectionManager.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using Aitex.Core.Util;
  6. using Npgsql;
  7. using System.Threading;
  8. using Aitex.Core.RT.Log;
  9. using Aitex.Core.RT.Event;
  10. using Aitex.Core.RT.DBCore;
  11. using System.IO;
  12. using Aitex.Common.Util;
  13. using Aitex.Core.RT.DataCenter;
  14. using System.Collections.Concurrent;
  15. namespace Aitex.Core.RT.DataCollection
  16. {
  17. public class DataCollectionManager : Singleton<DataCollectionManager>
  18. {
  19. NpgsqlConnection _conn;
  20. bool _bAlive = true;
  21. Dictionary<string, KeyValuePair<string, Func<object>>> _preSubscribedRecordedData; //for other thread to subscribe data
  22. Dictionary<string, KeyValuePair<string, Func<object>>> _subscribedRecordedData; //internal use
  23. Dictionary<string, KeyValuePair<string, Func<object>>> _subscribedDataList; //internal use for data service function
  24. object _lock = new object();
  25. object _lock2 = new object();
  26. R_TRIG _connFailTrig = new R_TRIG();
  27. IDataCollectionCallback _callback;
  28. public void Initialize(IDataCollectionCallback callback)
  29. {
  30. _callback = callback == null ? new DefaultDataCollectionCallback() : callback;
  31. //PrepareDatabaseTable();
  32. _preSubscribedRecordedData = new Dictionary<string, KeyValuePair<string, Func<object>>>();
  33. _subscribedRecordedData = new Dictionary<string, KeyValuePair<string, Func<object>>>();
  34. _subscribedDataList = new Dictionary<string, KeyValuePair<string, Func<object>>>();
  35. System.Threading.Tasks.Task.Factory.StartNew(DataRecorderThread);
  36. }
  37. public void Terminate()
  38. {
  39. _bAlive = false;
  40. }
  41. /// <summary>
  42. /// 检查数据库表结构是否需要更新、新建、修改操作
  43. /// 新建RecipeRunHistory表格,为了将之前的工艺历史数据导入到新表中,在此处执行对应的SQL更新语句
  44. /// </summary>
  45. // void PrepareDatabaseTable()
  46. //{
  47. // string sqlFile = _callback.GetSqlUpdateFile();
  48. // if (string.IsNullOrEmpty(sqlFile) || !File.Exists(sqlFile))
  49. // {
  50. // LOG.Info("没有更新Sql数据库,文件:"+sqlFile);
  51. // return;
  52. // }
  53. // using (StreamReader fs = new System.IO.StreamReader(sqlFile ))
  54. // {
  55. // string sql = fs.ReadToEnd();
  56. // PostgresqlHelper.ExecuteNonQuery(sql);
  57. // LOG.Write("完成对数据库表格的更新操作");
  58. // }
  59. //}
  60. /// <summary>
  61. /// Get subscribed data value by data name
  62. /// </summary>
  63. /// <param name="dataName"></param>
  64. /// <returns></returns>
  65. public object GetSubscribedDataValue(string dataName)
  66. {
  67. try
  68. {
  69. if (_subscribedDataList.ContainsKey(dataName))
  70. return _subscribedDataList[dataName].Value.Invoke();
  71. }
  72. catch (Exception ex)
  73. {
  74. LOG.Write(ex);
  75. }
  76. return null;
  77. }
  78. /// <summary>
  79. /// 获取注册的变量名
  80. /// </summary>
  81. /// <returns></returns>
  82. public IEnumerable<string> GetDataVariableNames()
  83. {
  84. return _subscribedDataList.Keys.ToList();
  85. }
  86. /// <summary>
  87. /// initialization
  88. /// </summary>
  89. public void SubscribeData(string dataName, string alias, Func<object> dataValue)
  90. {
  91. lock (_lock)
  92. {
  93. if (!_preSubscribedRecordedData.Keys.Contains(dataName))
  94. {
  95. _preSubscribedRecordedData.Add(dataName, new KeyValuePair<string, Func<object>>(alias, dataValue));
  96. }
  97. }
  98. lock (_lock2)
  99. {
  100. if (!_subscribedDataList.Keys.Contains(dataName))
  101. {
  102. _subscribedDataList.Add(dataName, new KeyValuePair<string, Func<object>>(alias, dataValue));
  103. }
  104. }
  105. }
  106. void MonitorDataCenter()
  107. {
  108. lock (_lock2)
  109. {
  110. SortedDictionary<string, Func<object>> data = DATA.GetDBRecorderList();
  111. foreach (var item in data)
  112. {
  113. if (!_subscribedDataList.ContainsKey(item.Key))
  114. {
  115. object o = item.Value.Invoke();
  116. if (o == null)
  117. continue;
  118. Type dataType = o.GetType();
  119. if (dataType == typeof(Boolean) || dataType == typeof(double) || dataType == typeof(float) ||
  120. dataType == typeof(int) || dataType == typeof(ushort) || dataType == typeof(short))
  121. SubscribeData(item.Key, item.Key, item.Value);
  122. }
  123. }
  124. }
  125. }
  126. /// <summary>
  127. /// open database connection
  128. /// </summary>
  129. /// <returns></returns>
  130. bool DBOpen()
  131. {
  132. bool result = true;
  133. try
  134. {
  135. if (_conn != null) _conn.Close();
  136. _conn = new NpgsqlConnection(PostgresqlHelper.ConnectionString);
  137. _conn.Open();
  138. _conn.ChangeDatabase(_callback.GetDBName());
  139. }
  140. catch (Exception ex)
  141. {
  142. if (_conn != null)
  143. {
  144. _conn.Close();
  145. _conn = null;
  146. }
  147. result = false;
  148. LOG.Write(ex);
  149. }
  150. return result;
  151. }
  152. /// <summary>
  153. /// 重置数据库连接出错事件
  154. /// </summary>
  155. public void Reset()
  156. {
  157. _connFailTrig.CLK = false;
  158. }
  159. /// <summary>
  160. /// data recorder thread
  161. /// </summary>
  162. void DataRecorderThread()
  163. {
  164. while (_bAlive)
  165. {
  166. try
  167. {
  168. Thread.Sleep(1000);
  169. var ret = DBOpen();
  170. //数据连接失败事件 触发器
  171. _connFailTrig.CLK = !ret;
  172. if (_connFailTrig.Q)
  173. _callback.PostDBFailedEvent();
  174. if (!ret) continue;
  175. //create one database table for each chamber for each day
  176. var today = DateTime.Now.Date;
  177. var now = DateTime.Now;
  178. MonitorDataCenter();
  179. if (_preSubscribedRecordedData.Count > 0)
  180. {
  181. lock (_lock)
  182. {
  183. foreach (var dataId in _preSubscribedRecordedData.Keys)
  184. {
  185. if (!_subscribedRecordedData.Keys.Contains(dataId))
  186. _subscribedRecordedData.Add(dataId, _preSubscribedRecordedData[dataId]);
  187. }
  188. _preSubscribedRecordedData.Clear();
  189. }
  190. }
  191. //pre-create insert sql sentence
  192. string insertSql = "";
  193. var tblName = string.Format("{0}.{1}", today.ToString("yyyyMMdd"), _callback.GetDataTablePrefix());
  194. //query if table already exist?
  195. string sqlTblDefine = string.Format("select column_name from information_schema.columns where table_name = '{0}';", tblName);
  196. NpgsqlCommand cmdTblDefine = new NpgsqlCommand(sqlTblDefine, _conn);
  197. var tblDefineData = cmdTblDefine.ExecuteReader();
  198. string tblAlertString = string.Empty;
  199. List<string> colNameList = new List<string>();
  200. while (tblDefineData.Read())
  201. {
  202. for (int i = 0; i < tblDefineData.FieldCount; i++)
  203. colNameList.Add(tblDefineData[i].ToString());
  204. }
  205. tblDefineData.Close();
  206. if (colNameList.Count > 0)
  207. {
  208. //table exist
  209. foreach (var dataName in _subscribedRecordedData.Keys)
  210. {
  211. if (!colNameList.Contains(dataName))
  212. {
  213. var dataType = _subscribedRecordedData[dataName].Value.Invoke().GetType();
  214. if (dataType == typeof(Boolean))
  215. {
  216. tblAlertString += string.Format("ALTER TABLE \"{0}\" ADD COLUMN \"{1}\" {2};", tblName, dataName, "Boolean");
  217. }
  218. else if (dataType == typeof(double) || dataType == typeof(float) || dataType == typeof(int) || dataType == typeof(Int16))
  219. {
  220. tblAlertString += string.Format("ALTER TABLE \"{0}\" ADD COLUMN \"{1}\" {2};", tblName, dataName, "Real");
  221. }
  222. else
  223. {
  224. }
  225. }
  226. }
  227. if (!string.IsNullOrEmpty(tblAlertString))
  228. {
  229. try
  230. {
  231. NpgsqlCommand alertTblCmd = new NpgsqlCommand(tblAlertString, _conn);
  232. alertTblCmd.ExecuteNonQuery();
  233. }
  234. catch (Exception ex)
  235. {
  236. LOG.Write(ex);
  237. throw;
  238. }
  239. }
  240. }
  241. else
  242. {
  243. //table not exist, auto generate a table
  244. string createTble = string.Format("CREATE TABLE \"{0}\"(Time bigint NOT NULL,", tblName);
  245. string commentStr = "";
  246. foreach (var dataName in _subscribedRecordedData.Keys)
  247. {
  248. Type dataType = _subscribedRecordedData[dataName].Value.Invoke().GetType();
  249. string alias = _subscribedRecordedData[dataName].Key;
  250. if (dataType == typeof(Boolean))
  251. {
  252. createTble += string.Format("\"{0}\" Boolean,\n", dataName);
  253. }
  254. else if (dataType == typeof(double) || dataType == typeof(float) ||
  255. dataType == typeof(int) || dataType == typeof(ushort) || dataType == typeof(short))
  256. {
  257. createTble += string.Format("\"{0}\" Real,\n", dataName);
  258. }
  259. if (!string.IsNullOrEmpty(alias) && ((dataType == typeof(Boolean)) || (dataType == typeof(double) || dataType == typeof(float) ||
  260. dataType == typeof(int) || dataType == typeof(ushort) || dataType == typeof(short))))
  261. commentStr += string.Format("COMMENT ON COLUMN \"{0}\".\"{1}\" IS '{2}';\n", tblName, dataName, alias);
  262. }
  263. createTble += string.Format("CONSTRAINT \"{0}_pkey\" PRIMARY KEY (Time));", tblName);
  264. createTble += string.Format("GRANT SELECT ON TABLE \"{0}\" TO postgres;", tblName);
  265. // createTble += string.Format("CREATE INDEX \"{0}_time_idx\" ON \"{0}\" USING btree (\"time\" );", tblName);
  266. createTble += commentStr;
  267. try
  268. {
  269. NpgsqlCommand cmd = new NpgsqlCommand(createTble.ToString(), _conn);
  270. cmd.ExecuteNonQuery();
  271. }
  272. catch (Exception ex1)
  273. {
  274. LOG.Write(ex1, "创建数据库表格失败");
  275. throw;
  276. }
  277. }
  278. //pre-create insert SQL sentence
  279. string preCreatedInsertSQL = string.Format("INSERT INTO \"{0}\"(\"time\" ", tblName);
  280. foreach (var dataId in _subscribedRecordedData.Keys)
  281. {
  282. preCreatedInsertSQL += string.Format(",\"{0}\"", dataId);
  283. }
  284. preCreatedInsertSQL += ")";
  285. insertSql = preCreatedInsertSQL;
  286. //insert data into database
  287. StringBuilder sb = new StringBuilder(10000);
  288. while (_bAlive)
  289. {
  290. Thread.Sleep(990); //for time delay in sleep function
  291. sb.Remove(0, sb.Length);
  292. sb.Append("Values(");
  293. sb.Append(DateTime.Now.Ticks.ToString());
  294. foreach (var dataName in _subscribedRecordedData.Keys)
  295. {
  296. sb.Append(",");
  297. var v1 = _subscribedRecordedData[dataName].Value.Invoke();
  298. if (v1 == null)
  299. {
  300. sb.Append("0");
  301. }
  302. else
  303. {
  304. if (v1.GetType() == typeof(double) || v1.GetType() == typeof(float))
  305. {
  306. double v2 = Convert.ToDouble(v1); //fixed SMART data exception
  307. if (double.IsNaN(v2))
  308. v2 = 0;
  309. sb.Append(v2.ToString());
  310. }
  311. else
  312. {
  313. sb.Append(v1.ToString());
  314. }
  315. }
  316. }
  317. sb.Append(");");
  318. NpgsqlCommand cmd = new NpgsqlCommand(insertSql + sb.ToString(), _conn);
  319. try
  320. {
  321. cmd.ExecuteNonQuery();
  322. }
  323. catch (Exception ex)
  324. {
  325. LOG.Write(ex, "数据记录发生异常"+insertSql);
  326. throw;
  327. }
  328. //if alert to another day, create a new table
  329. if (DateTime.Now.Date != today)
  330. break;
  331. //if preSubscribed data not empty, alert current table's structure
  332. if (_preSubscribedRecordedData.Count > 0)
  333. break;
  334. }//end while
  335. }//end while
  336. catch (Exception ex)
  337. {
  338. LOG.Write(ex, "数据库操作记录发生异常");
  339. EV.PostWarningLog("Database", "Database record failed,"+ex.Message);
  340. break;
  341. }
  342. }
  343. }
  344. }
  345. }