mh_sms/APT.BaseData.Domain/IServices/IInfluxDataBase/InfluxDataBaseService.cs
2024-04-12 16:50:28 +08:00

279 lines
9.5 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using APT.BaseData.Domain.IServices;
using APT.Infrastructure.Core;
using System;
using System.Collections.Generic;
using System.Text;
using InfluxData.Net.InfluxDb.Models;
using System.Threading.Tasks;
using System.Linq;
using APT.Infrastructure.Api;
namespace APT.BaseData.Domain.Base
{
public abstract class InfluxDataBaseService<T>: CommonService, IInfluxDataBaseService<T>
{
public InfluxDataBaseService(IRepository repository) : base(repository)
{
}
/// <summary>
/// 数据库名称
/// </summary>
/// <returns></returns>
protected abstract string InfluxDbName();
/// <summary>
/// 构建写入数据
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
protected abstract IEnumerable<Point> BuildPoints(List<T> data);
/// <summary>
/// 构建读取数据
/// </summary>
/// <param name="list"></param>
/// <returns></returns>
protected abstract T BuildModel(List<object> list);
/// <summary>
/// 保存数据
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
public async Task<bool> SaveData(List<T> data)
{
try
{
if (!data.Any())
return false;
var points = BuildPoints(data);
var pageLimit = 1000;
var page = (points.Count() / pageLimit) + 1;
for (var x = 0; x < points.Count(); x = x + pageLimit)
{
var pageList = points.Skip(x).Take(pageLimit);
var response = await InfluxDbHelper.InfluxDbClient.Client.WriteAsync(pageList, InfluxDbName());
if (!response.Success)
{
ThrowError("写入时序数据库失败!");
}
}
return true;
}
catch (Exception ex)
{
ThrowError(LogHelper.GetCurSourceFileName() + " - " + LogHelper.GetLineNum() + ":" + ex.Message);
throw;
}
}
/// <summary>
/// 获取数据
/// </summary>
/// <param name="ids"></param>
/// <param name="st"></param>
/// <param name="et"></param>
/// <returns></returns>
//public async Task<List<T>> GetDatas(List<Guid> ids, DateTime st, DateTime? et)
//{
// var points = new List<T>();
// foreach (var item in ids)
// {
// var list = await GetDatas(item, st, et);
// if (list.Any())
// points.AddRange(list);
// }
// return points;
//}
public async Task<List<T>> GetDatas(List<Guid> ids, DateTime st, DateTime? et)
{
var points = new List<T>();
if (ids.Count == 0)
return points;
try
{
var query = new List<string>();
foreach (var id in ids)
{
var queries = $" SELECT * FROM \"{id}\" WHERE time >= '{st.ToString("yyyy-MM-dd HH:mm:ss")}' " + (et == null ? ""
: $"and time < '{et.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ") + "order by time";
query.Add(queries);
}
var response = await InfluxDbHelper.InfluxDbClient.Client.MultiQueryAsync(query, InfluxDbName());
if (!response.Any())
return points;
//得到Serie集合对象返回执行多个查询的结果
var series = response.ToList();
//取出第一条命令的查询结果,是一个集合
if (!series.Any())
return points;
foreach (var list in series)
{
foreach (var data in list)
{
foreach (var item in data.Values)
{
//取原始数据
points.Add(BuildModel(item.ToList()));
}
}
}
}
catch (Exception ex)
{
throw;
}
return points;
}
/// <summary>
/// 获取数据
/// </summary>
/// <param name="id"></param>
/// <param name="st"></param>
/// <param name="et"></param>
/// <returns></returns>
public async Task<List<T>> GetDatas(Guid id, DateTime st, DateTime? et)
{
if (st == DateTime.MinValue)
st = st.AddYears(2000);
var points = new List<T>();
var queries = $" SELECT * FROM \"{id}\" WHERE time >= '{st.ToString("yyyy-MM-dd HH:mm:ss")}' " + (et == null ? "" : $"and time < '{et.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ") + "order by time";
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
if (!response.Any())
return points;
//得到Serie集合对象返回执行多个查询的结果
var series = response.ToList();
if (!series.Any())
return points;
//取出第一条命令的查询结果,是一个集合
var list = series[0].Values;
//从集合中取出第一条数据
foreach (var item in list)
{
points.Add(BuildModel(item.ToList()));
}
return points;
}
/// <summary>
/// 清空数据
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public async Task<bool> DeleteData(Guid id)
{
try
{
var queries = $"delete FROM \"{id}\"";
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
return true;
}
catch (Exception e)
{
return false;
}
}
/// <summary>
/// 删除数据
/// </summary>
/// <param name="id"></param>
/// <param name="time"></param>
/// <returns></returns>
public async Task<bool> DeleteData(Guid id, DateTime time)
{
try
{
var queries = $"delete FROM \"{id}\" WHERE \"time\" = '{time.ToString("yyyy-MM-dd HH:mm:ss")}'";
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
return true;
}
catch (Exception e)
{
return false;
}
}
/// <summary>
/// 删除时间区间内数据
/// </summary>
/// <param name="id"></param>
/// <param name="st"></param>
/// <param name="et"></param>
/// <returns></returns>
public async Task<bool> DeleteData(Guid id, DateTime? st, DateTime et)
{
try
{
var queries = $"delete FROM \"{id}\" WHERE \"time\" < '{et.ToString("yyyy-MM-dd HH:mm:ss")}' " + (st == null ? "" : $"and \"time\" >= '{st.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ");
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
return true;
}
catch (Exception e)
{
return false;
}
}
/// <summary>
/// 获取点最后一条数据。
/// </summary>
/// <param name="id"></param>
/// <param name="time"></param>
/// <returns></returns>
public async Task<T> GetLastData(Guid id, DateTime time)
{
var queries = $" SELECT * FROM \"{id}\" WHERE time< '{time.ToString("yyyy-MM-dd HH:mm:ss")}' order by time desc limit 1";
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
if (!response.Any())
return default(T);
//得到Serie集合对象返回执行多个查询的结果
var series = response.ToList();
//取出第一条命令的查询结果,是一个集合
var item = series[0].Values.FirstOrDefault().ToList();
return BuildModel(item);
}
/// <summary>
/// 获取点第一个条数据。
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public async Task<T> GetFirstData(Guid id)
{
var queries = $" SELECT * FROM \"{id}\" order by time limit 1";
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
if (!response.Any())
return default(T);
//得到Serie集合对象返回执行多个查询的结果
var series = response.ToList();
//取出第一条命令的查询结果,是一个集合
var list = series[0].Values.FirstOrDefault().ToList();
return BuildModel(list);
}
/// <summary>
/// 输出错误日志
/// </summary>
/// <param name="str"></param>
public void ThrowError(string str)
{
LoggerManager.GetLogger().Error(str);
}
}
}