279 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
		
		
			
		
	
	
			279 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
| 
								 | 
							
								using APT.BaseData.Domain.IServices;
							 | 
						|||
| 
								 | 
							
								using APT.Infrastructure.Core;
							 | 
						|||
| 
								 | 
							
								using System;
							 | 
						|||
| 
								 | 
							
								using System.Collections.Generic;
							 | 
						|||
| 
								 | 
							
								using System.Text;
							 | 
						|||
| 
								 | 
							
								using InfluxData.Net.InfluxDb.Models;
							 | 
						|||
| 
								 | 
							
								using System.Threading.Tasks;
							 | 
						|||
| 
								 | 
							
								using System.Linq;
							 | 
						|||
| 
								 | 
							
								using APT.Infrastructure.Api;
							 | 
						|||
| 
								 | 
							
								namespace APT.BaseData.Domain.Base
							 | 
						|||
| 
								 | 
							
								{
							 | 
						|||
| 
								 | 
							
								    public abstract class InfluxDataBaseService<T>: CommonService, IInfluxDataBaseService<T>
							 | 
						|||
| 
								 | 
							
								    {
							 | 
						|||
| 
								 | 
							
								        public InfluxDataBaseService(IRepository repository) : base(repository)
							 | 
						|||
| 
								 | 
							
								        {
							 | 
						|||
| 
								 | 
							
								        }
							 | 
						|||
| 
								 | 
							
								        /// <summary>
							 | 
						|||
| 
								 | 
							
								        /// 数据库名称
							 | 
						|||
| 
								 | 
							
								        /// </summary>
							 | 
						|||
| 
								 | 
							
								        /// <returns></returns>
							 | 
						|||
| 
								 | 
							
								        protected abstract string InfluxDbName();
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								        /// <summary>
							 | 
						|||
| 
								 | 
							
								        /// 构建写入数据
							 | 
						|||
| 
								 | 
							
								        /// </summary>
							 | 
						|||
| 
								 | 
							
								        /// <param name="data"></param>
							 | 
						|||
| 
								 | 
							
								        /// <returns></returns>
							 | 
						|||
| 
								 | 
							
								        protected abstract IEnumerable<Point> BuildPoints(List<T> data);
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								        /// <summary>
							 | 
						|||
| 
								 | 
							
								        /// 构建读取数据
							 | 
						|||
| 
								 | 
							
								        /// </summary>
							 | 
						|||
| 
								 | 
							
								        /// <param name="list"></param>
							 | 
						|||
| 
								 | 
							
								        /// <returns></returns>
							 | 
						|||
| 
								 | 
							
								        protected abstract T BuildModel(List<object> list);
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								        /// <summary>
							 | 
						|||
| 
								 | 
							
								        /// 保存数据
							 | 
						|||
| 
								 | 
							
								        /// </summary>
							 | 
						|||
| 
								 | 
							
								        /// <param name="data"></param>
							 | 
						|||
| 
								 | 
							
								        /// <returns></returns>
							 | 
						|||
| 
								 | 
							
								        public async Task<bool> SaveData(List<T> data)
							 | 
						|||
| 
								 | 
							
								        {
							 | 
						|||
| 
								 | 
							
								            try
							 | 
						|||
| 
								 | 
							
								            {
							 | 
						|||
| 
								 | 
							
								                if (!data.Any())
							 | 
						|||
| 
								 | 
							
								                    return false;
							 | 
						|||
| 
								 | 
							
								                var points = BuildPoints(data);
							 | 
						|||
| 
								 | 
							
								                var pageLimit = 1000;
							 | 
						|||
| 
								 | 
							
								                var page = (points.Count() / pageLimit) + 1;
							 | 
						|||
| 
								 | 
							
								                for (var x = 0; x < points.Count(); x = x + pageLimit)
							 | 
						|||
| 
								 | 
							
								                {
							 | 
						|||
| 
								 | 
							
								                    var pageList = points.Skip(x).Take(pageLimit);
							 | 
						|||
| 
								 | 
							
								                    var response = await InfluxDbHelper.InfluxDbClient.Client.WriteAsync(pageList, InfluxDbName());
							 | 
						|||
| 
								 | 
							
								                    if (!response.Success)
							 | 
						|||
| 
								 | 
							
								                    {
							 | 
						|||
| 
								 | 
							
								                        ThrowError("写入时序数据库失败!");
							 | 
						|||
| 
								 | 
							
								                    }
							 | 
						|||
| 
								 | 
							
								                }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								                return true;
							 | 
						|||
| 
								 | 
							
								            }
							 | 
						|||
| 
								 | 
							
								            catch (Exception ex)
							 | 
						|||
| 
								 | 
							
								            {
							 | 
						|||
| 
								 | 
							
								                ThrowError(LogHelper.GetCurSourceFileName() + " - " + LogHelper.GetLineNum() + ":" + ex.Message);
							 | 
						|||
| 
								 | 
							
								                throw;
							 | 
						|||
| 
								 | 
							
								            }
							 | 
						|||
| 
								 | 
							
								        }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								        /// <summary>
							 | 
						|||
| 
								 | 
							
								        /// 获取数据
							 | 
						|||
| 
								 | 
							
								        /// </summary>
							 | 
						|||
| 
								 | 
							
								        /// <param name="ids"></param>
							 | 
						|||
| 
								 | 
							
								        /// <param name="st"></param>
							 | 
						|||
| 
								 | 
							
								        /// <param name="et"></param>
							 | 
						|||
| 
								 | 
							
								        /// <returns></returns>
							 | 
						|||
| 
								 | 
							
								        //public async Task<List<T>> GetDatas(List<Guid> ids, DateTime st, DateTime? et)
							 | 
						|||
| 
								 | 
							
								        //{
							 | 
						|||
| 
								 | 
							
								        //    var points = new List<T>();
							 | 
						|||
| 
								 | 
							
								        //    foreach (var item in ids)
							 | 
						|||
| 
								 | 
							
								        //    {
							 | 
						|||
| 
								 | 
							
								        //        var list = await GetDatas(item, st, et);
							 | 
						|||
| 
								 | 
							
								        //        if (list.Any())
							 | 
						|||
| 
								 | 
							
								        //            points.AddRange(list);
							 | 
						|||
| 
								 | 
							
								        //    }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								        //    return points;
							 | 
						|||
| 
								 | 
							
								        //}
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								        public async Task<List<T>> GetDatas(List<Guid> ids, DateTime st, DateTime? et)
							 | 
						|||
| 
								 | 
							
								        {
							 | 
						|||
| 
								 | 
							
								            var points = new List<T>();
							 | 
						|||
| 
								 | 
							
								            if (ids.Count == 0)
							 | 
						|||
| 
								 | 
							
								                return points;
							 | 
						|||
| 
								 | 
							
								            try
							 | 
						|||
| 
								 | 
							
								            {
							 | 
						|||
| 
								 | 
							
								                var query = new List<string>();
							 | 
						|||
| 
								 | 
							
								                foreach (var id in ids)
							 | 
						|||
| 
								 | 
							
								                {
							 | 
						|||
| 
								 | 
							
								                    var queries = $" SELECT * FROM \"{id}\" WHERE time >= '{st.ToString("yyyy-MM-dd HH:mm:ss")}' " + (et == null ? ""
							 | 
						|||
| 
								 | 
							
								                                  : $"and time < '{et.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ") + "order by time";
							 | 
						|||
| 
								 | 
							
								                    query.Add(queries);
							 | 
						|||
| 
								 | 
							
								                }
							 | 
						|||
| 
								 | 
							
								                var response = await InfluxDbHelper.InfluxDbClient.Client.MultiQueryAsync(query, InfluxDbName());
							 | 
						|||
| 
								 | 
							
								                if (!response.Any())
							 | 
						|||
| 
								 | 
							
								                    return points;
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								                //得到Serie集合对象(返回执行多个查询的结果)
							 | 
						|||
| 
								 | 
							
								                var series = response.ToList();
							 | 
						|||
| 
								 | 
							
								                //取出第一条命令的查询结果,是一个集合
							 | 
						|||
| 
								 | 
							
								                if (!series.Any())
							 | 
						|||
| 
								 | 
							
								                    return points;
							 | 
						|||
| 
								 | 
							
								                foreach (var list in series)
							 | 
						|||
| 
								 | 
							
								                {
							 | 
						|||
| 
								 | 
							
								                    foreach (var data in list)
							 | 
						|||
| 
								 | 
							
								                    {
							 | 
						|||
| 
								 | 
							
								                        foreach (var item in data.Values)
							 | 
						|||
| 
								 | 
							
								                        {
							 | 
						|||
| 
								 | 
							
								                            //取原始数据
							 | 
						|||
| 
								 | 
							
								                            points.Add(BuildModel(item.ToList()));
							 | 
						|||
| 
								 | 
							
								                        }
							 | 
						|||
| 
								 | 
							
								                    }
							 | 
						|||
| 
								 | 
							
								                }
							 | 
						|||
| 
								 | 
							
								            }
							 | 
						|||
| 
								 | 
							
								            catch (Exception ex)
							 | 
						|||
| 
								 | 
							
								            {
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								                throw;
							 | 
						|||
| 
								 | 
							
								            }
							 | 
						|||
| 
								 | 
							
								           
							 | 
						|||
| 
								 | 
							
								          
							 | 
						|||
| 
								 | 
							
								            return points;
							 | 
						|||
| 
								 | 
							
								        }
							 | 
						|||
| 
								 | 
							
								        /// <summary>
							 | 
						|||
| 
								 | 
							
								        /// 获取数据
							 | 
						|||
| 
								 | 
							
								        /// </summary>
							 | 
						|||
| 
								 | 
							
								        /// <param name="id"></param>
							 | 
						|||
| 
								 | 
							
								        /// <param name="st"></param>
							 | 
						|||
| 
								 | 
							
								        /// <param name="et"></param>
							 | 
						|||
| 
								 | 
							
								        /// <returns></returns>
							 | 
						|||
| 
								 | 
							
								        public async Task<List<T>> GetDatas(Guid id, DateTime st, DateTime? et)
							 | 
						|||
| 
								 | 
							
								        {
							 | 
						|||
| 
								 | 
							
								            if (st == DateTime.MinValue)
							 | 
						|||
| 
								 | 
							
								                st = st.AddYears(2000);
							 | 
						|||
| 
								 | 
							
								            var points = new List<T>();
							 | 
						|||
| 
								 | 
							
								            var queries = $" SELECT * FROM \"{id}\" WHERE time >= '{st.ToString("yyyy-MM-dd HH:mm:ss")}' " + (et == null ? "" : $"and time < '{et.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ") + "order by time";
							 | 
						|||
| 
								 | 
							
								            var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
							 | 
						|||
| 
								 | 
							
								            if (!response.Any())
							 | 
						|||
| 
								 | 
							
								                return points;
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								            //得到Serie集合对象(返回执行多个查询的结果)
							 | 
						|||
| 
								 | 
							
								            var series = response.ToList();
							 | 
						|||
| 
								 | 
							
								            if (!series.Any())
							 | 
						|||
| 
								 | 
							
								                return points;
							 | 
						|||
| 
								 | 
							
								            //取出第一条命令的查询结果,是一个集合
							 | 
						|||
| 
								 | 
							
								            var list = series[0].Values;
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								            //从集合中取出第一条数据
							 | 
						|||
| 
								 | 
							
								            foreach (var item in list)
							 | 
						|||
| 
								 | 
							
								            {
							 | 
						|||
| 
								 | 
							
								                points.Add(BuildModel(item.ToList()));
							 | 
						|||
| 
								 | 
							
								            }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								            return points;
							 | 
						|||
| 
								 | 
							
								        }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								        /// <summary>
							 | 
						|||
| 
								 | 
							
								        /// 清空数据
							 | 
						|||
| 
								 | 
							
								        /// </summary>
							 | 
						|||
| 
								 | 
							
								        /// <param name="id"></param>
							 | 
						|||
| 
								 | 
							
								        /// <returns></returns>
							 | 
						|||
| 
								 | 
							
								        public async Task<bool> DeleteData(Guid id)
							 | 
						|||
| 
								 | 
							
								        {
							 | 
						|||
| 
								 | 
							
								            try
							 | 
						|||
| 
								 | 
							
								            {
							 | 
						|||
| 
								 | 
							
								                var queries = $"delete FROM \"{id}\"";
							 | 
						|||
| 
								 | 
							
								                var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
							 | 
						|||
| 
								 | 
							
								                return true;
							 | 
						|||
| 
								 | 
							
								            }
							 | 
						|||
| 
								 | 
							
								            catch (Exception e)
							 | 
						|||
| 
								 | 
							
								            {
							 | 
						|||
| 
								 | 
							
								                return false;
							 | 
						|||
| 
								 | 
							
								            }
							 | 
						|||
| 
								 | 
							
								        }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								        /// <summary>
							 | 
						|||
| 
								 | 
							
								        /// 删除数据
							 | 
						|||
| 
								 | 
							
								        /// </summary>
							 | 
						|||
| 
								 | 
							
								        /// <param name="id"></param>
							 | 
						|||
| 
								 | 
							
								        /// <param name="time"></param>
							 | 
						|||
| 
								 | 
							
								        /// <returns></returns>
							 | 
						|||
| 
								 | 
							
								        public async Task<bool> DeleteData(Guid id, DateTime time)
							 | 
						|||
| 
								 | 
							
								        {
							 | 
						|||
| 
								 | 
							
								            try
							 | 
						|||
| 
								 | 
							
								            {
							 | 
						|||
| 
								 | 
							
								                var queries = $"delete FROM \"{id}\" WHERE \"time\" = '{time.ToString("yyyy-MM-dd HH:mm:ss")}'";
							 | 
						|||
| 
								 | 
							
								                var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
							 | 
						|||
| 
								 | 
							
								                return true;
							 | 
						|||
| 
								 | 
							
								            }
							 | 
						|||
| 
								 | 
							
								            catch (Exception e)
							 | 
						|||
| 
								 | 
							
								            {
							 | 
						|||
| 
								 | 
							
								                return false;
							 | 
						|||
| 
								 | 
							
								            }
							 | 
						|||
| 
								 | 
							
								        }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								        /// <summary>
							 | 
						|||
| 
								 | 
							
								        /// 删除时间区间内数据
							 | 
						|||
| 
								 | 
							
								        /// </summary>
							 | 
						|||
| 
								 | 
							
								        /// <param name="id"></param>
							 | 
						|||
| 
								 | 
							
								        /// <param name="st"></param>
							 | 
						|||
| 
								 | 
							
								        /// <param name="et"></param>
							 | 
						|||
| 
								 | 
							
								        /// <returns></returns>
							 | 
						|||
| 
								 | 
							
								        public async Task<bool> DeleteData(Guid id, DateTime? st, DateTime et)
							 | 
						|||
| 
								 | 
							
								        {
							 | 
						|||
| 
								 | 
							
								            try
							 | 
						|||
| 
								 | 
							
								            {
							 | 
						|||
| 
								 | 
							
								                var queries = $"delete FROM \"{id}\" WHERE \"time\" < '{et.ToString("yyyy-MM-dd HH:mm:ss")}' " + (st == null ? "" : $"and \"time\" >= '{st.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ");
							 | 
						|||
| 
								 | 
							
								                var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
							 | 
						|||
| 
								 | 
							
								                return true;
							 | 
						|||
| 
								 | 
							
								            }
							 | 
						|||
| 
								 | 
							
								            catch (Exception e)
							 | 
						|||
| 
								 | 
							
								            {
							 | 
						|||
| 
								 | 
							
								                return false;
							 | 
						|||
| 
								 | 
							
								            }
							 | 
						|||
| 
								 | 
							
								        }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								        /// <summary>
							 | 
						|||
| 
								 | 
							
								        ///  获取点最后一条数据。
							 | 
						|||
| 
								 | 
							
								        /// </summary>
							 | 
						|||
| 
								 | 
							
								        /// <param name="id"></param>
							 | 
						|||
| 
								 | 
							
								        /// <param name="time"></param>
							 | 
						|||
| 
								 | 
							
								        /// <returns></returns>
							 | 
						|||
| 
								 | 
							
								        public async Task<T> GetLastData(Guid id, DateTime time)
							 | 
						|||
| 
								 | 
							
								        {
							 | 
						|||
| 
								 | 
							
								            var queries = $" SELECT * FROM \"{id}\" WHERE time< '{time.ToString("yyyy-MM-dd HH:mm:ss")}' order by time desc limit 1";
							 | 
						|||
| 
								 | 
							
								            var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
							 | 
						|||
| 
								 | 
							
								            if (!response.Any())
							 | 
						|||
| 
								 | 
							
								                return default(T);
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								            //得到Serie集合对象(返回执行多个查询的结果)
							 | 
						|||
| 
								 | 
							
								            var series = response.ToList();
							 | 
						|||
| 
								 | 
							
								            //取出第一条命令的查询结果,是一个集合
							 | 
						|||
| 
								 | 
							
								            var item = series[0].Values.FirstOrDefault().ToList();
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								            return BuildModel(item);
							 | 
						|||
| 
								 | 
							
								        }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								        /// <summary>
							 | 
						|||
| 
								 | 
							
								        /// 获取点第一个条数据。
							 | 
						|||
| 
								 | 
							
								        /// </summary>
							 | 
						|||
| 
								 | 
							
								        /// <param name="id"></param>
							 | 
						|||
| 
								 | 
							
								        /// <returns></returns>
							 | 
						|||
| 
								 | 
							
								        public async Task<T> GetFirstData(Guid id)
							 | 
						|||
| 
								 | 
							
								        {
							 | 
						|||
| 
								 | 
							
								            var queries = $" SELECT * FROM \"{id}\" order by time limit 1";
							 | 
						|||
| 
								 | 
							
								            var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
							 | 
						|||
| 
								 | 
							
								            if (!response.Any())
							 | 
						|||
| 
								 | 
							
								                return default(T);
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								            //得到Serie集合对象(返回执行多个查询的结果)
							 | 
						|||
| 
								 | 
							
								            var series = response.ToList();
							 | 
						|||
| 
								 | 
							
								            //取出第一条命令的查询结果,是一个集合
							 | 
						|||
| 
								 | 
							
								            var list = series[0].Values.FirstOrDefault().ToList();
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								            return BuildModel(list);
							 | 
						|||
| 
								 | 
							
								        }
							 | 
						|||
| 
								 | 
							
								
							 | 
						|||
| 
								 | 
							
								        /// <summary>
							 | 
						|||
| 
								 | 
							
								        /// 输出错误日志
							 | 
						|||
| 
								 | 
							
								        /// </summary>
							 | 
						|||
| 
								 | 
							
								        /// <param name="str"></param>
							 | 
						|||
| 
								 | 
							
								        public void ThrowError(string str)
							 | 
						|||
| 
								 | 
							
								        {
							 | 
						|||
| 
								 | 
							
								            LoggerManager.GetLogger().Error(str);
							 | 
						|||
| 
								 | 
							
								        }
							 | 
						|||
| 
								 | 
							
								    }
							 | 
						|||
| 
								 | 
							
								}
							 |