mh_sms/APT.BaseData.Domain/IServices/IInfluxDataBase/InfluxDataBaseService.cs

279 lines
9.5 KiB
C#
Raw Permalink Normal View History

2024-04-12 16:50:28 +08:00
using APT.BaseData.Domain.IServices;
using APT.Infrastructure.Core;
using System;
using System.Collections.Generic;
using System.Text;
using InfluxData.Net.InfluxDb.Models;
using System.Threading.Tasks;
using System.Linq;
using APT.Infrastructure.Api;
namespace APT.BaseData.Domain.Base
{
public abstract class InfluxDataBaseService<T>: CommonService, IInfluxDataBaseService<T>
{
public InfluxDataBaseService(IRepository repository) : base(repository)
{
}
/// <summary>
/// 数据库名称
/// </summary>
/// <returns></returns>
protected abstract string InfluxDbName();
/// <summary>
/// 构建写入数据
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
protected abstract IEnumerable<Point> BuildPoints(List<T> data);
/// <summary>
/// 构建读取数据
/// </summary>
/// <param name="list"></param>
/// <returns></returns>
protected abstract T BuildModel(List<object> list);
/// <summary>
/// 保存数据
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
public async Task<bool> SaveData(List<T> data)
{
try
{
if (!data.Any())
return false;
var points = BuildPoints(data);
var pageLimit = 1000;
var page = (points.Count() / pageLimit) + 1;
for (var x = 0; x < points.Count(); x = x + pageLimit)
{
var pageList = points.Skip(x).Take(pageLimit);
var response = await InfluxDbHelper.InfluxDbClient.Client.WriteAsync(pageList, InfluxDbName());
if (!response.Success)
{
ThrowError("写入时序数据库失败!");
}
}
return true;
}
catch (Exception ex)
{
ThrowError(LogHelper.GetCurSourceFileName() + " - " + LogHelper.GetLineNum() + ":" + ex.Message);
throw;
}
}
/// <summary>
/// 获取数据
/// </summary>
/// <param name="ids"></param>
/// <param name="st"></param>
/// <param name="et"></param>
/// <returns></returns>
//public async Task<List<T>> GetDatas(List<Guid> ids, DateTime st, DateTime? et)
//{
// var points = new List<T>();
// foreach (var item in ids)
// {
// var list = await GetDatas(item, st, et);
// if (list.Any())
// points.AddRange(list);
// }
// return points;
//}
public async Task<List<T>> GetDatas(List<Guid> ids, DateTime st, DateTime? et)
{
var points = new List<T>();
if (ids.Count == 0)
return points;
try
{
var query = new List<string>();
foreach (var id in ids)
{
var queries = $" SELECT * FROM \"{id}\" WHERE time >= '{st.ToString("yyyy-MM-dd HH:mm:ss")}' " + (et == null ? ""
: $"and time < '{et.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ") + "order by time";
query.Add(queries);
}
var response = await InfluxDbHelper.InfluxDbClient.Client.MultiQueryAsync(query, InfluxDbName());
if (!response.Any())
return points;
//得到Serie集合对象返回执行多个查询的结果
var series = response.ToList();
//取出第一条命令的查询结果,是一个集合
if (!series.Any())
return points;
foreach (var list in series)
{
foreach (var data in list)
{
foreach (var item in data.Values)
{
//取原始数据
points.Add(BuildModel(item.ToList()));
}
}
}
}
catch (Exception ex)
{
throw;
}
return points;
}
/// <summary>
/// 获取数据
/// </summary>
/// <param name="id"></param>
/// <param name="st"></param>
/// <param name="et"></param>
/// <returns></returns>
public async Task<List<T>> GetDatas(Guid id, DateTime st, DateTime? et)
{
if (st == DateTime.MinValue)
st = st.AddYears(2000);
var points = new List<T>();
var queries = $" SELECT * FROM \"{id}\" WHERE time >= '{st.ToString("yyyy-MM-dd HH:mm:ss")}' " + (et == null ? "" : $"and time < '{et.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ") + "order by time";
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
if (!response.Any())
return points;
//得到Serie集合对象返回执行多个查询的结果
var series = response.ToList();
if (!series.Any())
return points;
//取出第一条命令的查询结果,是一个集合
var list = series[0].Values;
//从集合中取出第一条数据
foreach (var item in list)
{
points.Add(BuildModel(item.ToList()));
}
return points;
}
/// <summary>
/// 清空数据
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public async Task<bool> DeleteData(Guid id)
{
try
{
var queries = $"delete FROM \"{id}\"";
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
return true;
}
catch (Exception e)
{
return false;
}
}
/// <summary>
/// 删除数据
/// </summary>
/// <param name="id"></param>
/// <param name="time"></param>
/// <returns></returns>
public async Task<bool> DeleteData(Guid id, DateTime time)
{
try
{
var queries = $"delete FROM \"{id}\" WHERE \"time\" = '{time.ToString("yyyy-MM-dd HH:mm:ss")}'";
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
return true;
}
catch (Exception e)
{
return false;
}
}
/// <summary>
/// 删除时间区间内数据
/// </summary>
/// <param name="id"></param>
/// <param name="st"></param>
/// <param name="et"></param>
/// <returns></returns>
public async Task<bool> DeleteData(Guid id, DateTime? st, DateTime et)
{
try
{
var queries = $"delete FROM \"{id}\" WHERE \"time\" < '{et.ToString("yyyy-MM-dd HH:mm:ss")}' " + (st == null ? "" : $"and \"time\" >= '{st.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ");
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
return true;
}
catch (Exception e)
{
return false;
}
}
/// <summary>
/// 获取点最后一条数据。
/// </summary>
/// <param name="id"></param>
/// <param name="time"></param>
/// <returns></returns>
public async Task<T> GetLastData(Guid id, DateTime time)
{
var queries = $" SELECT * FROM \"{id}\" WHERE time< '{time.ToString("yyyy-MM-dd HH:mm:ss")}' order by time desc limit 1";
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
if (!response.Any())
return default(T);
//得到Serie集合对象返回执行多个查询的结果
var series = response.ToList();
//取出第一条命令的查询结果,是一个集合
var item = series[0].Values.FirstOrDefault().ToList();
return BuildModel(item);
}
/// <summary>
/// 获取点第一个条数据。
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public async Task<T> GetFirstData(Guid id)
{
var queries = $" SELECT * FROM \"{id}\" order by time limit 1";
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
if (!response.Any())
return default(T);
//得到Serie集合对象返回执行多个查询的结果
var series = response.ToList();
//取出第一条命令的查询结果,是一个集合
var list = series[0].Values.FirstOrDefault().ToList();
return BuildModel(list);
}
/// <summary>
/// 输出错误日志
/// </summary>
/// <param name="str"></param>
public void ThrowError(string str)
{
LoggerManager.GetLogger().Error(str);
}
}
}