279 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
			
		
		
	
	
			279 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
using APT.BaseData.Domain.IServices;
 | 
						||
using APT.Infrastructure.Core;
 | 
						||
using System;
 | 
						||
using System.Collections.Generic;
 | 
						||
using System.Text;
 | 
						||
using InfluxData.Net.InfluxDb.Models;
 | 
						||
using System.Threading.Tasks;
 | 
						||
using System.Linq;
 | 
						||
using APT.Infrastructure.Api;
 | 
						||
namespace APT.BaseData.Domain.Base
 | 
						||
{
 | 
						||
    public abstract class InfluxDataBaseService<T>: CommonService, IInfluxDataBaseService<T>
 | 
						||
    {
 | 
						||
        public InfluxDataBaseService(IRepository repository) : base(repository)
 | 
						||
        {
 | 
						||
        }
 | 
						||
        /// <summary>
 | 
						||
        /// 数据库名称
 | 
						||
        /// </summary>
 | 
						||
        /// <returns></returns>
 | 
						||
        protected abstract string InfluxDbName();
 | 
						||
 | 
						||
        /// <summary>
 | 
						||
        /// 构建写入数据
 | 
						||
        /// </summary>
 | 
						||
        /// <param name="data"></param>
 | 
						||
        /// <returns></returns>
 | 
						||
        protected abstract IEnumerable<Point> BuildPoints(List<T> data);
 | 
						||
 | 
						||
        /// <summary>
 | 
						||
        /// 构建读取数据
 | 
						||
        /// </summary>
 | 
						||
        /// <param name="list"></param>
 | 
						||
        /// <returns></returns>
 | 
						||
        protected abstract T BuildModel(List<object> list);
 | 
						||
 | 
						||
        /// <summary>
 | 
						||
        /// 保存数据
 | 
						||
        /// </summary>
 | 
						||
        /// <param name="data"></param>
 | 
						||
        /// <returns></returns>
 | 
						||
        public async Task<bool> SaveData(List<T> data)
 | 
						||
        {
 | 
						||
            try
 | 
						||
            {
 | 
						||
                if (!data.Any())
 | 
						||
                    return false;
 | 
						||
                var points = BuildPoints(data);
 | 
						||
                var pageLimit = 1000;
 | 
						||
                var page = (points.Count() / pageLimit) + 1;
 | 
						||
                for (var x = 0; x < points.Count(); x = x + pageLimit)
 | 
						||
                {
 | 
						||
                    var pageList = points.Skip(x).Take(pageLimit);
 | 
						||
                    var response = await InfluxDbHelper.InfluxDbClient.Client.WriteAsync(pageList, InfluxDbName());
 | 
						||
                    if (!response.Success)
 | 
						||
                    {
 | 
						||
                        ThrowError("写入时序数据库失败!");
 | 
						||
                    }
 | 
						||
                }
 | 
						||
 | 
						||
                return true;
 | 
						||
            }
 | 
						||
            catch (Exception ex)
 | 
						||
            {
 | 
						||
                ThrowError(LogHelper.GetCurSourceFileName() + " - " + LogHelper.GetLineNum() + ":" + ex.Message);
 | 
						||
                throw;
 | 
						||
            }
 | 
						||
        }
 | 
						||
 | 
						||
        /// <summary>
 | 
						||
        /// 获取数据
 | 
						||
        /// </summary>
 | 
						||
        /// <param name="ids"></param>
 | 
						||
        /// <param name="st"></param>
 | 
						||
        /// <param name="et"></param>
 | 
						||
        /// <returns></returns>
 | 
						||
        //public async Task<List<T>> GetDatas(List<Guid> ids, DateTime st, DateTime? et)
 | 
						||
        //{
 | 
						||
        //    var points = new List<T>();
 | 
						||
        //    foreach (var item in ids)
 | 
						||
        //    {
 | 
						||
        //        var list = await GetDatas(item, st, et);
 | 
						||
        //        if (list.Any())
 | 
						||
        //            points.AddRange(list);
 | 
						||
        //    }
 | 
						||
 | 
						||
        //    return points;
 | 
						||
        //}
 | 
						||
 | 
						||
 | 
						||
        public async Task<List<T>> GetDatas(List<Guid> ids, DateTime st, DateTime? et)
 | 
						||
        {
 | 
						||
            var points = new List<T>();
 | 
						||
            if (ids.Count == 0)
 | 
						||
                return points;
 | 
						||
            try
 | 
						||
            {
 | 
						||
                var query = new List<string>();
 | 
						||
                foreach (var id in ids)
 | 
						||
                {
 | 
						||
                    var queries = $" SELECT * FROM \"{id}\" WHERE time >= '{st.ToString("yyyy-MM-dd HH:mm:ss")}' " + (et == null ? ""
 | 
						||
                                  : $"and time < '{et.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ") + "order by time";
 | 
						||
                    query.Add(queries);
 | 
						||
                }
 | 
						||
                var response = await InfluxDbHelper.InfluxDbClient.Client.MultiQueryAsync(query, InfluxDbName());
 | 
						||
                if (!response.Any())
 | 
						||
                    return points;
 | 
						||
 | 
						||
                //得到Serie集合对象(返回执行多个查询的结果)
 | 
						||
                var series = response.ToList();
 | 
						||
                //取出第一条命令的查询结果,是一个集合
 | 
						||
                if (!series.Any())
 | 
						||
                    return points;
 | 
						||
                foreach (var list in series)
 | 
						||
                {
 | 
						||
                    foreach (var data in list)
 | 
						||
                    {
 | 
						||
                        foreach (var item in data.Values)
 | 
						||
                        {
 | 
						||
                            //取原始数据
 | 
						||
                            points.Add(BuildModel(item.ToList()));
 | 
						||
                        }
 | 
						||
                    }
 | 
						||
                }
 | 
						||
            }
 | 
						||
            catch (Exception ex)
 | 
						||
            {
 | 
						||
 | 
						||
                throw;
 | 
						||
            }
 | 
						||
           
 | 
						||
          
 | 
						||
            return points;
 | 
						||
        }
 | 
						||
        /// <summary>
 | 
						||
        /// 获取数据
 | 
						||
        /// </summary>
 | 
						||
        /// <param name="id"></param>
 | 
						||
        /// <param name="st"></param>
 | 
						||
        /// <param name="et"></param>
 | 
						||
        /// <returns></returns>
 | 
						||
        public async Task<List<T>> GetDatas(Guid id, DateTime st, DateTime? et)
 | 
						||
        {
 | 
						||
            if (st == DateTime.MinValue)
 | 
						||
                st = st.AddYears(2000);
 | 
						||
            var points = new List<T>();
 | 
						||
            var queries = $" SELECT * FROM \"{id}\" WHERE time >= '{st.ToString("yyyy-MM-dd HH:mm:ss")}' " + (et == null ? "" : $"and time < '{et.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ") + "order by time";
 | 
						||
            var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
 | 
						||
            if (!response.Any())
 | 
						||
                return points;
 | 
						||
 | 
						||
            //得到Serie集合对象(返回执行多个查询的结果)
 | 
						||
            var series = response.ToList();
 | 
						||
            if (!series.Any())
 | 
						||
                return points;
 | 
						||
            //取出第一条命令的查询结果,是一个集合
 | 
						||
            var list = series[0].Values;
 | 
						||
 | 
						||
            //从集合中取出第一条数据
 | 
						||
            foreach (var item in list)
 | 
						||
            {
 | 
						||
                points.Add(BuildModel(item.ToList()));
 | 
						||
            }
 | 
						||
 | 
						||
            return points;
 | 
						||
        }
 | 
						||
 | 
						||
        /// <summary>
 | 
						||
        /// 清空数据
 | 
						||
        /// </summary>
 | 
						||
        /// <param name="id"></param>
 | 
						||
        /// <returns></returns>
 | 
						||
        public async Task<bool> DeleteData(Guid id)
 | 
						||
        {
 | 
						||
            try
 | 
						||
            {
 | 
						||
                var queries = $"delete FROM \"{id}\"";
 | 
						||
                var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
 | 
						||
                return true;
 | 
						||
            }
 | 
						||
            catch (Exception e)
 | 
						||
            {
 | 
						||
                return false;
 | 
						||
            }
 | 
						||
        }
 | 
						||
 | 
						||
        /// <summary>
 | 
						||
        /// 删除数据
 | 
						||
        /// </summary>
 | 
						||
        /// <param name="id"></param>
 | 
						||
        /// <param name="time"></param>
 | 
						||
        /// <returns></returns>
 | 
						||
        public async Task<bool> DeleteData(Guid id, DateTime time)
 | 
						||
        {
 | 
						||
            try
 | 
						||
            {
 | 
						||
                var queries = $"delete FROM \"{id}\" WHERE \"time\" = '{time.ToString("yyyy-MM-dd HH:mm:ss")}'";
 | 
						||
                var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
 | 
						||
                return true;
 | 
						||
            }
 | 
						||
            catch (Exception e)
 | 
						||
            {
 | 
						||
                return false;
 | 
						||
            }
 | 
						||
        }
 | 
						||
 | 
						||
        /// <summary>
 | 
						||
        /// 删除时间区间内数据
 | 
						||
        /// </summary>
 | 
						||
        /// <param name="id"></param>
 | 
						||
        /// <param name="st"></param>
 | 
						||
        /// <param name="et"></param>
 | 
						||
        /// <returns></returns>
 | 
						||
        public async Task<bool> DeleteData(Guid id, DateTime? st, DateTime et)
 | 
						||
        {
 | 
						||
            try
 | 
						||
            {
 | 
						||
                var queries = $"delete FROM \"{id}\" WHERE \"time\" < '{et.ToString("yyyy-MM-dd HH:mm:ss")}' " + (st == null ? "" : $"and \"time\" >= '{st.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ");
 | 
						||
                var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
 | 
						||
                return true;
 | 
						||
            }
 | 
						||
            catch (Exception e)
 | 
						||
            {
 | 
						||
                return false;
 | 
						||
            }
 | 
						||
        }
 | 
						||
 | 
						||
        /// <summary>
 | 
						||
        ///  获取点最后一条数据。
 | 
						||
        /// </summary>
 | 
						||
        /// <param name="id"></param>
 | 
						||
        /// <param name="time"></param>
 | 
						||
        /// <returns></returns>
 | 
						||
        public async Task<T> GetLastData(Guid id, DateTime time)
 | 
						||
        {
 | 
						||
            var queries = $" SELECT * FROM \"{id}\" WHERE time< '{time.ToString("yyyy-MM-dd HH:mm:ss")}' order by time desc limit 1";
 | 
						||
            var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
 | 
						||
            if (!response.Any())
 | 
						||
                return default(T);
 | 
						||
 | 
						||
            //得到Serie集合对象(返回执行多个查询的结果)
 | 
						||
            var series = response.ToList();
 | 
						||
            //取出第一条命令的查询结果,是一个集合
 | 
						||
            var item = series[0].Values.FirstOrDefault().ToList();
 | 
						||
 | 
						||
            return BuildModel(item);
 | 
						||
        }
 | 
						||
 | 
						||
        /// <summary>
 | 
						||
        /// 获取点第一个条数据。
 | 
						||
        /// </summary>
 | 
						||
        /// <param name="id"></param>
 | 
						||
        /// <returns></returns>
 | 
						||
        public async Task<T> GetFirstData(Guid id)
 | 
						||
        {
 | 
						||
            var queries = $" SELECT * FROM \"{id}\" order by time limit 1";
 | 
						||
            var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
 | 
						||
            if (!response.Any())
 | 
						||
                return default(T);
 | 
						||
 | 
						||
            //得到Serie集合对象(返回执行多个查询的结果)
 | 
						||
            var series = response.ToList();
 | 
						||
            //取出第一条命令的查询结果,是一个集合
 | 
						||
            var list = series[0].Values.FirstOrDefault().ToList();
 | 
						||
 | 
						||
            return BuildModel(list);
 | 
						||
        }
 | 
						||
 | 
						||
        /// <summary>
 | 
						||
        /// 输出错误日志
 | 
						||
        /// </summary>
 | 
						||
        /// <param name="str"></param>
 | 
						||
        public void ThrowError(string str)
 | 
						||
        {
 | 
						||
            LoggerManager.GetLogger().Error(str);
 | 
						||
        }
 | 
						||
    }
 | 
						||
}
 |