using APT.BaseData.Domain.IServices; using APT.Infrastructure.Core; using System; using System.Collections.Generic; using System.Text; using InfluxData.Net.InfluxDb.Models; using System.Threading.Tasks; using System.Linq; using APT.Infrastructure.Api; namespace APT.BaseData.Domain.Base { public abstract class InfluxDataBaseService: CommonService, IInfluxDataBaseService { public InfluxDataBaseService(IRepository repository) : base(repository) { } /// /// 数据库名称 /// /// protected abstract string InfluxDbName(); /// /// 构建写入数据 /// /// /// protected abstract IEnumerable BuildPoints(List data); /// /// 构建读取数据 /// /// /// protected abstract T BuildModel(List list); /// /// 保存数据 /// /// /// public async Task SaveData(List data) { try { if (!data.Any()) return false; var points = BuildPoints(data); var pageLimit = 1000; var page = (points.Count() / pageLimit) + 1; for (var x = 0; x < points.Count(); x = x + pageLimit) { var pageList = points.Skip(x).Take(pageLimit); var response = await InfluxDbHelper.InfluxDbClient.Client.WriteAsync(pageList, InfluxDbName()); if (!response.Success) { ThrowError("写入时序数据库失败!"); } } return true; } catch (Exception ex) { ThrowError(LogHelper.GetCurSourceFileName() + " - " + LogHelper.GetLineNum() + ":" + ex.Message); throw; } } /// /// 获取数据 /// /// /// /// /// //public async Task> GetDatas(List ids, DateTime st, DateTime? et) //{ // var points = new List(); // foreach (var item in ids) // { // var list = await GetDatas(item, st, et); // if (list.Any()) // points.AddRange(list); // } // return points; //} public async Task> GetDatas(List ids, DateTime st, DateTime? et) { var points = new List(); if (ids.Count == 0) return points; try { var query = new List(); foreach (var id in ids) { var queries = $" SELECT * FROM \"{id}\" WHERE time >= '{st.ToString("yyyy-MM-dd HH:mm:ss")}' " + (et == null ? "" : $"and time < '{et.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ") + "order by time"; query.Add(queries); } var response = await InfluxDbHelper.InfluxDbClient.Client.MultiQueryAsync(query, InfluxDbName()); if (!response.Any()) return points; //得到Serie集合对象(返回执行多个查询的结果) var series = response.ToList(); //取出第一条命令的查询结果,是一个集合 if (!series.Any()) return points; foreach (var list in series) { foreach (var data in list) { foreach (var item in data.Values) { //取原始数据 points.Add(BuildModel(item.ToList())); } } } } catch (Exception ex) { throw; } return points; } /// /// 获取数据 /// /// /// /// /// public async Task> GetDatas(Guid id, DateTime st, DateTime? et) { if (st == DateTime.MinValue) st = st.AddYears(2000); var points = new List(); var queries = $" SELECT * FROM \"{id}\" WHERE time >= '{st.ToString("yyyy-MM-dd HH:mm:ss")}' " + (et == null ? "" : $"and time < '{et.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ") + "order by time"; var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName()); if (!response.Any()) return points; //得到Serie集合对象(返回执行多个查询的结果) var series = response.ToList(); if (!series.Any()) return points; //取出第一条命令的查询结果,是一个集合 var list = series[0].Values; //从集合中取出第一条数据 foreach (var item in list) { points.Add(BuildModel(item.ToList())); } return points; } /// /// 清空数据 /// /// /// public async Task DeleteData(Guid id) { try { var queries = $"delete FROM \"{id}\""; var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName()); return true; } catch (Exception e) { return false; } } /// /// 删除数据 /// /// /// /// public async Task DeleteData(Guid id, DateTime time) { try { var queries = $"delete FROM \"{id}\" WHERE \"time\" = '{time.ToString("yyyy-MM-dd HH:mm:ss")}'"; var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName()); return true; } catch (Exception e) { return false; } } /// /// 删除时间区间内数据 /// /// /// /// /// public async Task DeleteData(Guid id, DateTime? st, DateTime et) { try { var queries = $"delete FROM \"{id}\" WHERE \"time\" < '{et.ToString("yyyy-MM-dd HH:mm:ss")}' " + (st == null ? "" : $"and \"time\" >= '{st.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' "); var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName()); return true; } catch (Exception e) { return false; } } /// /// 获取点最后一条数据。 /// /// /// /// public async Task GetLastData(Guid id, DateTime time) { var queries = $" SELECT * FROM \"{id}\" WHERE time< '{time.ToString("yyyy-MM-dd HH:mm:ss")}' order by time desc limit 1"; var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName()); if (!response.Any()) return default(T); //得到Serie集合对象(返回执行多个查询的结果) var series = response.ToList(); //取出第一条命令的查询结果,是一个集合 var item = series[0].Values.FirstOrDefault().ToList(); return BuildModel(item); } /// /// 获取点第一个条数据。 /// /// /// public async Task GetFirstData(Guid id) { var queries = $" SELECT * FROM \"{id}\" order by time limit 1"; var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName()); if (!response.Any()) return default(T); //得到Serie集合对象(返回执行多个查询的结果) var series = response.ToList(); //取出第一条命令的查询结果,是一个集合 var list = series[0].Values.FirstOrDefault().ToList(); return BuildModel(list); } /// /// 输出错误日志 /// /// public void ThrowError(string str) { LoggerManager.GetLogger().Error(str); } } }