using System; using System.Collections.Generic; using System.Linq; using System.Text; using Aitex.Core.Util; using Aitex.Core.Utilities; using Aitex.Core.RT.Event; using Aitex.Core.RT.Log; using System.Data; using System.IO; using Aitex.Core.RT.OperationCenter; using MECF.Framework.Common.DBCore; using MECF.Framework.Common.ControlDataContext; using CyberX8_Core; namespace Aitex.Core.RT.DBCore { public class DatabaseManager : ICommonDB { PeriodicJob _thread; FixSizeQueue _sqlQueue = new FixSizeQueue(1000); private Dictionary _sqlErrorDictionary= new Dictionary(); PostgresqlDB _db = new PostgresqlDB(); DatabaseCleaner _cleaner = new DatabaseCleaner(); public DataBaseStatus DataBaseStatus => _db.DBStatus; public void Initialize(string connectionString, string dbName, string sqlFile) { if (string.IsNullOrEmpty(connectionString)) { throw new ApplicationException("数据库连接字段未设置"); } PostgresqlHelper.ConnectionString = connectionString; if (!_db.Open(connectionString, dbName)) { //LOG.Error("数据库连接失败"); } else { PrepareDatabaseTable(sqlFile); } _cleaner.Initialize(dbName); _thread = new PeriodicJob(100, this.PeriodicRun, "DBJob", true); DB.Instance = this; DatabaseTable.UpgradeDataTable(); if (UserDataRecorder.GetUserItems().Count == 0) { UserItem userItem = new UserItem() { Name = "admin", Role = Role.Manager.ToString(), Password = "admin", Notes = "System Default" }; UserDataRecorder.InserUser(userItem); } } public void Terminate() { if (_thread != null) { _thread.Stop(); _thread = null; } _cleaner.Terminate(); _db.Close(); } bool PeriodicRun() { if (!_db.ActiveConnection()) return true; string sql; while (_sqlQueue.TryDequeue(out sql)) { try { _db.ExecuteNonQuery(sql); } catch (Exception ex) { LOG.WriteDBExeption(string.Format("执行数据库操作错误, {0}", sql), ex); _db.Close(); _db.Open(); if(!_sqlErrorDictionary.ContainsKey(sql)) { _sqlErrorDictionary[sql] = 0; } _sqlErrorDictionary[sql]++; if (_sqlErrorDictionary[sql] < 3) { _sqlQueue.Enqueue(sql); } else { _sqlErrorDictionary.Remove(sql); } } } return true; } public void Insert(string sql) { _sqlQueue.Enqueue(sql); } /// /// 同步插入 /// /// /// public int SyncInsert(string sql) { try { return _db.ExecuteNonQuery(sql); } catch(Exception ex) { LOG.WriteDBExeption(string.Format("执行数据库操作错误, {0}", sql), ex); //if (ex.Message.Contains("Connection")) _db.Close(); _db.Open(); return 0; } } /// /// 更新 /// /// public void Update(string sql) { _sqlQueue.Enqueue(sql); } /// /// 更新 /// /// /// public int SyncUpdate(string sql) { try { return _db.ExecuteNonQuery(sql); } catch(Exception ex) { LOG.WriteDBExeption(string.Format("执行数据库操作错误, {0}", sql), ex); //if (ex.Message.Contains("Connection")) _db.Close(); _db.Open(); return 0; } } public void CreateTableIfNotExisted(string table, Dictionary columns, bool addPID, string primaryKey) { _db.CreateTableIfNotExisted(table, columns, addPID, primaryKey); } public void CreateTableIndexIfNotExisted(string table, string index, string sql) { _db.CreateTableIndexIfNotExisted(table, index, sql); } public void CreateTableColumn(string table, Dictionary columns) { _db.CreateTableColumn(table, columns); } public DataSet ExecuteDataset(string cmdText, params object[] p) { return _db.ExecuteDataset(cmdText, p); } void PrepareDatabaseTable(string sqlFile) { if (string.IsNullOrEmpty(sqlFile) || !File.Exists(sqlFile)) { //LOG.Info("没有更新Sql数据库,文件:" + sqlFile); return; } try { using (StreamReader fs = new System.IO.StreamReader(sqlFile)) { string sql = fs.ReadToEnd(); _db.ExecuteNonQuery(sql); //LOG.Write("完成对数据库表格的更新操作"); } } catch (Exception ex) { LOG.WriteExeption(ex); } } } }