PostgresqlDB.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using Npgsql;
  6. using Aitex.Core.RT.Log;
  7. using System.Data;
  8. using Aitex.Core.Utilities;
  9. namespace Aitex.Core.RT.DBCore
  10. {
  11. public enum DataBaseStatus
  12. {
  13. Close,
  14. Open,
  15. Error
  16. }
  17. public class PostgresqlDB
  18. {
  19. NpgsqlConnection _conn;
  20. string _connectionString;
  21. string _dbName;
  22. Retry _retryConnection = new Retry();
  23. private int _closeMaxTime = 4;
  24. private int _closeTime = 0;
  25. public DataBaseStatus DBStatus
  26. {
  27. get
  28. {
  29. //超过允许异常关闭的次数
  30. if (_closeTime >= _closeMaxTime)
  31. return DataBaseStatus.Error;
  32. else
  33. return _conn.State == ConnectionState.Open ? DataBaseStatus.Open : DataBaseStatus.Close;
  34. }
  35. }
  36. public PostgresqlDB()
  37. {
  38. }
  39. public bool Open(string connectionString, string dbName)
  40. {
  41. _connectionString = connectionString;
  42. _dbName = dbName;
  43. bool result = true;
  44. try
  45. {
  46. if (_conn != null)
  47. _conn.Close();
  48. _conn = new NpgsqlConnection(connectionString);
  49. _conn.Open();
  50. if (!string.IsNullOrEmpty(dbName))
  51. CreateDBIfNotExisted(dbName);
  52. }
  53. catch (Exception ex)
  54. {
  55. if (_conn != null)
  56. {
  57. _conn.Close();
  58. _conn = null;
  59. }
  60. result = false;
  61. LOG.WriteExeption(ex);
  62. }
  63. _retryConnection.Result = result;
  64. //if (_retryConnection.IsErrored)
  65. return result;
  66. }
  67. bool Open()
  68. {
  69. return Open(_connectionString, _dbName);
  70. }
  71. void PrepareCommand(NpgsqlCommand cmd, NpgsqlConnection conn, string cmdText, params object[] p)
  72. {
  73. cmd.Parameters.Clear();
  74. cmd.Connection = conn;
  75. cmd.CommandText = cmdText;
  76. cmd.CommandType = CommandType.Text;
  77. if (p != null)
  78. {
  79. foreach (object parm in p)
  80. cmd.Parameters.AddWithValue(string.Empty, parm);
  81. }
  82. }
  83. public int ExecuteNonQuery(string cmdText, params object[] p)
  84. {
  85. try
  86. {
  87. using (NpgsqlCommand command = new NpgsqlCommand())
  88. {
  89. PrepareCommand(command, _conn, cmdText, p);
  90. return command.ExecuteNonQuery();
  91. }
  92. }
  93. catch
  94. {
  95. _closeTime++;
  96. Close();
  97. throw;
  98. }
  99. }
  100. public DataSet ExecuteDataset(string cmdText, params object[] p)
  101. {
  102. try
  103. {
  104. DataSet ds = new DataSet();
  105. using (var connection = new NpgsqlConnection(_connectionString))
  106. {
  107. connection.Open();
  108. connection.ChangeDatabase(_dbName);
  109. using (NpgsqlCommand command = new NpgsqlCommand())
  110. {
  111. try
  112. {
  113. PrepareCommand(command, connection, cmdText, p);
  114. NpgsqlDataAdapter da = new NpgsqlDataAdapter(command);
  115. da.Fill(ds);
  116. }
  117. catch (Exception ex)
  118. {
  119. LOG.WriteExeption("执行查询出错," + cmdText, ex);
  120. }
  121. }
  122. }
  123. return ds;
  124. }
  125. catch (Exception ex)
  126. {
  127. LOG.WriteExeption("执行查询出错," + cmdText, ex);
  128. }
  129. return null;
  130. }
  131. public NpgsqlDataReader ExecuteReader(string cmdText, params object[] p)
  132. {
  133. try
  134. {
  135. using (NpgsqlCommand command = new NpgsqlCommand())
  136. {
  137. PrepareCommand(command, _conn, cmdText, p);
  138. return command.ExecuteReader(CommandBehavior.CloseConnection);
  139. }
  140. }
  141. catch
  142. {
  143. Close();
  144. throw;
  145. }
  146. }
  147. public bool ActiveConnection()
  148. {
  149. if (_conn != null && _conn.State == ConnectionState.Open)
  150. return true;
  151. return Open();
  152. }
  153. public void Close()
  154. {
  155. try
  156. {
  157. if (_conn != null)
  158. _conn.Close();
  159. _conn = null;
  160. }
  161. catch (Exception ex)
  162. {
  163. LOG.WriteExeption(ex);
  164. }
  165. }
  166. public void CreateDBIfNotExisted(string db)
  167. {
  168. NpgsqlDataReader reader =
  169. ExecuteReader(string.Format(@"select datname from pg_catalog.pg_database where datname='{0}'", db));
  170. if (!reader.HasRows)
  171. {
  172. string sql = string.Format(@"
  173. CREATE DATABASE {0}
  174. WITH OWNER = postgres
  175. ENCODING = 'UTF8'
  176. TABLESPACE = pg_default
  177. CONNECTION LIMIT = -1", db);
  178. ExecuteNonQuery(sql);
  179. }
  180. try
  181. {
  182. _conn.ChangeDatabase(db);
  183. }
  184. catch
  185. {
  186. _conn.Close();
  187. throw;
  188. }
  189. }
  190. public void CreateTableIndexIfNotExisted(string table, string index, string sql)
  191. {
  192. NpgsqlDataReader reader = ExecuteReader($"select* from pg_indexes where tablename='{table}' and indexname = '{index}'");
  193. if (!reader.HasRows)
  194. {
  195. ExecuteNonQuery(sql);
  196. }
  197. }
  198. public void CreateTableIfNotExisted(string table, Dictionary<string, Type> columns, bool addPID,
  199. string primaryKey)
  200. {
  201. NpgsqlDataReader reader =
  202. ExecuteReader(
  203. string.Format(@"select column_name from information_schema.columns where table_name = '{0}'",
  204. table));
  205. if (!reader.HasRows)
  206. {
  207. string cols = addPID ? " \"PID\" serial NOT NULL," : "";
  208. foreach (var item in columns)
  209. {
  210. if (item.Value == typeof(int))
  211. cols += string.Format("\"{0}\" integer,", item.Key);
  212. else if (item.Value == typeof(string))
  213. cols += string.Format("\"{0}\" text,", item.Key);
  214. else if (item.Value == typeof(DateTime))
  215. cols += string.Format("\"{0}\" timestamp without time zone,", item.Key);
  216. else if (item.Value == typeof(bool))
  217. cols += string.Format("\"{0}\" boolean,", item.Key);
  218. else if (item.Value == typeof(float))
  219. cols += string.Format("\"{0}\" real,", item.Key);
  220. }
  221. if (string.IsNullOrEmpty(primaryKey))
  222. {
  223. if (cols.LastIndexOf(',') == cols.Length - 1)
  224. cols = cols.Remove(cols.Length - 1);
  225. }
  226. else
  227. {
  228. cols += string.Format("CONSTRAINT \"{0}_pkey\" PRIMARY KEY (\"{1}\" )", table, primaryKey);
  229. }
  230. ExecuteNonQuery(string.Format("CREATE TABLE \"{0}\"({1})WITH ( OIDS=FALSE );", table, cols));
  231. }
  232. else
  233. {
  234. CreateTableColumn(table, columns);
  235. }
  236. }
  237. public void CreateTableColumn(string tableName, Dictionary<string, Type> columns)
  238. {
  239. try
  240. {
  241. //query if table already exist?
  242. string sqlTblDefine =
  243. string.Format("select column_name from information_schema.columns where table_name = '{0}';",
  244. tableName);
  245. NpgsqlCommand cmdTblDefine = new NpgsqlCommand(sqlTblDefine, _conn);
  246. var tblDefineData = cmdTblDefine.ExecuteReader();
  247. string tblAlertString = string.Empty;
  248. List<string> colNameList = new List<string>();
  249. while (tblDefineData.Read())
  250. {
  251. for (int i = 0; i < tblDefineData.FieldCount; i++)
  252. colNameList.Add(tblDefineData[i].ToString());
  253. }
  254. tblDefineData.Close();
  255. if (colNameList.Count > 0)
  256. {
  257. //table exist
  258. foreach (var column in columns)
  259. {
  260. if (!colNameList.Contains(column.Key))
  261. {
  262. if (column.Value == typeof(Boolean))
  263. {
  264. tblAlertString += string.Format("ALTER TABLE \"{0}\" ADD COLUMN \"{1}\" {2};",
  265. tableName, column.Key, "Boolean");
  266. }
  267. else if (column.Value == typeof(double) || column.Value == typeof(float))
  268. {
  269. tblAlertString += string.Format("ALTER TABLE \"{0}\" ADD COLUMN \"{1}\" {2};",
  270. tableName, column.Key, "real");
  271. }
  272. else if (column.Value == typeof(DateTime))
  273. {
  274. tblAlertString += string.Format("ALTER TABLE \"{0}\" ADD COLUMN \"{1}\" {2};",
  275. tableName, column.Key, "timestamp without time zone");
  276. }
  277. else if (column.Value == typeof(int) || column.Value == typeof(ushort) ||
  278. column.Value == typeof(short))
  279. {
  280. tblAlertString += string.Format("ALTER TABLE \"{0}\" ADD COLUMN \"{1}\" {2};",
  281. tableName, column.Key, "integer");
  282. }
  283. else
  284. {
  285. tblAlertString += string.Format("ALTER TABLE \"{0}\" ADD COLUMN \"{1}\" {2};",
  286. tableName, column.Key, "text");
  287. }
  288. }
  289. }
  290. if (!string.IsNullOrEmpty(tblAlertString))
  291. {
  292. try
  293. {
  294. NpgsqlCommand alertTblCmd = new NpgsqlCommand(tblAlertString, _conn);
  295. alertTblCmd.ExecuteNonQuery();
  296. }
  297. catch (Exception ex)
  298. {
  299. LOG.WriteExeption(ex);
  300. }
  301. }
  302. }
  303. }
  304. catch (Exception ex)
  305. {
  306. LOG.WriteExeption(ex);
  307. }
  308. }
  309. }
  310. }