| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 | using System;using System.Collections.Generic;using System.Linq;using System.Text;using Aitex.Core.Util;using Aitex.Core.Utilities;using Aitex.Core.RT.Event;using Aitex.Core.RT.Log;using System.Data;using System.IO;using Aitex.Core.RT.OperationCenter;using MECF.Framework.Common.DBCore;using MECF.Framework.Common.ControlDataContext;using CyberX8_Core;namespace Aitex.Core.RT.DBCore{    public class DatabaseManager : ICommonDB    {        PeriodicJob _thread;        FixSizeQueue<string> _sqlQueue = new FixSizeQueue<string>(1000);        private Dictionary<string,int> _sqlErrorDictionary= new Dictionary<string,int>();        PostgresqlDB _db = new PostgresqlDB();        DatabaseCleaner _cleaner = new DatabaseCleaner();        public DataBaseStatus DataBaseStatus => _db.DBStatus;        public void Initialize(string connectionString, string dbName, string sqlFile)        {            if (string.IsNullOrEmpty(connectionString))            {                throw new ApplicationException("数据库连接字段未设置");            }            PostgresqlHelper.ConnectionString = connectionString;            if (!_db.Open(connectionString, dbName))            {                //LOG.Error("数据库连接失败");            }            else            {                PrepareDatabaseTable(sqlFile);            }            _cleaner.Initialize(dbName);            _thread = new PeriodicJob(100, this.PeriodicRun, "DBJob", true);            DB.Instance = this;            DatabaseTable.UpgradeDataTable();            if (UserDataRecorder.GetUserItems().Count == 0)            {                UserItem userItem = new UserItem() { Name = "admin", Role = Role.Manager.ToString(), Password = "admin", Notes = "System Default" };                UserDataRecorder.InserUser(userItem);            }        }        public void Terminate()        {            if (_thread != null)            {                _thread.Stop();                _thread = null;            }            _cleaner.Terminate();            _db.Close();        }        bool PeriodicRun()        {            if (!_db.ActiveConnection())                return true;            string sql;            while (_sqlQueue.TryDequeue(out sql))            {                try                {                    _db.ExecuteNonQuery(sql);                }                catch (Exception ex)                {                    LOG.WriteDBExeption(string.Format("执行数据库操作错误, {0}",  sql), ex);                    _db.Close();                    _db.Open();                    if(!_sqlErrorDictionary.ContainsKey(sql))                    {                        _sqlErrorDictionary[sql] = 0;                    }                    _sqlErrorDictionary[sql]++;                    if (_sqlErrorDictionary[sql] < 3)                    {                        _sqlQueue.Enqueue(sql);                    }                    else                    {                        _sqlErrorDictionary.Remove(sql);                    }                }            }            return true;        }        public void Insert(string sql)        {            _sqlQueue.Enqueue(sql);        }        /// <summary>        /// 同步插入        /// </summary>        /// <param name="sql"></param>        /// <returns></returns>        public int SyncInsert(string sql)        {            try            {                return _db.ExecuteNonQuery(sql);            }            catch(Exception ex)            {                LOG.WriteDBExeption(string.Format("执行数据库操作错误, {0}", sql), ex);                //if (ex.Message.Contains("Connection"))                    _db.Close();                    _db.Open();                return 0;            }        }        /// <summary>        /// 更新        /// </summary>        /// <param name="sql"></param>        public void Update(string sql)        {            _sqlQueue.Enqueue(sql);        }        /// <summary>        /// 更新        /// </summary>        /// <param name="sql"></param>        /// <returns></returns>        public int SyncUpdate(string sql)        {            try            {                return _db.ExecuteNonQuery(sql);            }            catch(Exception ex)            {                LOG.WriteDBExeption(string.Format("执行数据库操作错误, {0}", sql), ex);                //if (ex.Message.Contains("Connection"))                _db.Close();                _db.Open();                return 0;            }        }        public void CreateTableIfNotExisted(string table, Dictionary<string, Type> columns, bool addPID, string primaryKey)        {            _db.CreateTableIfNotExisted(table, columns, addPID, primaryKey);        }        public void CreateTableIndexIfNotExisted(string table, string index, string sql)        {            _db.CreateTableIndexIfNotExisted(table, index, sql);        }        public void CreateTableColumn(string table, Dictionary<string, Type> columns)        {            _db.CreateTableColumn(table, columns);        }        public DataSet ExecuteDataset(string cmdText, params object[] p)        {            return _db.ExecuteDataset(cmdText, p);        }        void PrepareDatabaseTable(string sqlFile)        {            if (string.IsNullOrEmpty(sqlFile) || !File.Exists(sqlFile))            {                //LOG.Info("没有更新Sql数据库,文件:" + sqlFile);                return;            }            try            {                using (StreamReader fs = new System.IO.StreamReader(sqlFile))                {                    string sql = fs.ReadToEnd();                    _db.ExecuteNonQuery(sql);                    //LOG.Write("完成对数据库表格的更新操作");                }            }            catch (Exception ex)            {                LOG.WriteExeption(ex);            }        }    }}
 |