| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 | using System;using System.Collections.Generic;using System.Linq;using System.Text;using Aitex.Core.Util;using Aitex.Core.Utilities;using Aitex.Core.RT.Event;using Aitex.Core.RT.Log;using System.Data;using System.IO;using Aitex.Core.RT.OperationCenter;using MECF.Framework.Common.DBCore;namespace Aitex.Core.RT.DBCore{    public class DatabaseManager : ICommonDB    {        PeriodicJob _thread;        FixSizeQueue<string> _sqlQueue = new FixSizeQueue<string>(1000);        PostgresqlDB _db = new PostgresqlDB();        DatabaseCleaner _cleaner = new DatabaseCleaner();        public DataBaseStatus DataBaseStatus => _db.DBStatus;        public void Initialize(string connectionString, string dbName, string sqlFile)        {            if (string.IsNullOrEmpty(connectionString))            {                throw new ApplicationException("数据库连接字段未设置");            }            PostgresqlHelper.ConnectionString = connectionString;            if (!_db.Open(connectionString, dbName))            {                //LOG.Error("数据库连接失败");            }            else            {                PrepareDatabaseTable(sqlFile);            }            _cleaner.Initialize(dbName);            _thread = new PeriodicJob(100, this.PeriodicRun, "DBJob", true);            DB.Instance = this;            DatabaseTable.UpgradeDataTable();        }        public void Terminate()        {            if (_thread != null)            {                _thread.Stop();                _thread = null;            }            _cleaner.Terminate();            _db.Close();        }        bool PeriodicRun()        {            if (!_db.ActiveConnection())                return true;            string sql;            while (_sqlQueue.TryDequeue(out sql))            {                try                {                    _db.ExecuteNonQuery(sql);                }                catch (Exception ex)                {                    LOG.WriteExeption(string.Format("执行数据库操作错误, {0}", sql), ex);                }            }            return true;        }        public void Insert(string sql)        {            _sqlQueue.Enqueue(sql);        }        public void CreateTableIfNotExisted(string table, Dictionary<string, Type> columns, bool addPID, string primaryKey)        {            _db.CreateTableIfNotExisted(table, columns, addPID, primaryKey);        }        public void CreateTableIndexIfNotExisted(string table, string index, string sql)        {            _db.CreateTableIndexIfNotExisted(table, index, sql);        }        public void CreateTableColumn(string table, Dictionary<string, Type> columns)        {            _db.CreateTableColumn(table, columns);        }        public DataSet ExecuteDataset(string cmdText, params object[] p)        {            return _db.ExecuteDataset(cmdText, p);        }        public int GetAllCount(string cmdText, params object[] p)        {            return _db.GetAllCount(cmdText, p);        }        void PrepareDatabaseTable(string sqlFile)        {            if (string.IsNullOrEmpty(sqlFile) || !File.Exists(sqlFile))            {                //LOG.Info("没有更新Sql数据库,文件:" + sqlFile);                return;            }            try            {                using (StreamReader fs = new System.IO.StreamReader(sqlFile))                {                    string sql = fs.ReadToEnd();                    _db.ExecuteNonQuery(sql);                    //LOG.Write("完成对数据库表格的更新操作");                }            }            catch (Exception ex)            {                LOG.WriteExeption(ex);            }        }    }}
 |