279 lines
9.5 KiB
C#
279 lines
9.5 KiB
C#
using APT.BaseData.Domain.IServices;
|
||
using APT.Infrastructure.Core;
|
||
using System;
|
||
using System.Collections.Generic;
|
||
using System.Text;
|
||
using InfluxData.Net.InfluxDb.Models;
|
||
using System.Threading.Tasks;
|
||
using System.Linq;
|
||
using APT.Infrastructure.Api;
|
||
namespace APT.BaseData.Domain.Base
|
||
{
|
||
public abstract class InfluxDataBaseService<T>: CommonService, IInfluxDataBaseService<T>
|
||
{
|
||
public InfluxDataBaseService(IRepository repository) : base(repository)
|
||
{
|
||
}
|
||
/// <summary>
|
||
/// 数据库名称
|
||
/// </summary>
|
||
/// <returns></returns>
|
||
protected abstract string InfluxDbName();
|
||
|
||
/// <summary>
|
||
/// 构建写入数据
|
||
/// </summary>
|
||
/// <param name="data"></param>
|
||
/// <returns></returns>
|
||
protected abstract IEnumerable<Point> BuildPoints(List<T> data);
|
||
|
||
/// <summary>
|
||
/// 构建读取数据
|
||
/// </summary>
|
||
/// <param name="list"></param>
|
||
/// <returns></returns>
|
||
protected abstract T BuildModel(List<object> list);
|
||
|
||
/// <summary>
|
||
/// 保存数据
|
||
/// </summary>
|
||
/// <param name="data"></param>
|
||
/// <returns></returns>
|
||
public async Task<bool> SaveData(List<T> data)
|
||
{
|
||
try
|
||
{
|
||
if (!data.Any())
|
||
return false;
|
||
var points = BuildPoints(data);
|
||
var pageLimit = 1000;
|
||
var page = (points.Count() / pageLimit) + 1;
|
||
for (var x = 0; x < points.Count(); x = x + pageLimit)
|
||
{
|
||
var pageList = points.Skip(x).Take(pageLimit);
|
||
var response = await InfluxDbHelper.InfluxDbClient.Client.WriteAsync(pageList, InfluxDbName());
|
||
if (!response.Success)
|
||
{
|
||
ThrowError("写入时序数据库失败!");
|
||
}
|
||
}
|
||
|
||
return true;
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
ThrowError(LogHelper.GetCurSourceFileName() + " - " + LogHelper.GetLineNum() + ":" + ex.Message);
|
||
throw;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取数据
|
||
/// </summary>
|
||
/// <param name="ids"></param>
|
||
/// <param name="st"></param>
|
||
/// <param name="et"></param>
|
||
/// <returns></returns>
|
||
//public async Task<List<T>> GetDatas(List<Guid> ids, DateTime st, DateTime? et)
|
||
//{
|
||
// var points = new List<T>();
|
||
// foreach (var item in ids)
|
||
// {
|
||
// var list = await GetDatas(item, st, et);
|
||
// if (list.Any())
|
||
// points.AddRange(list);
|
||
// }
|
||
|
||
// return points;
|
||
//}
|
||
|
||
|
||
public async Task<List<T>> GetDatas(List<Guid> ids, DateTime st, DateTime? et)
|
||
{
|
||
var points = new List<T>();
|
||
if (ids.Count == 0)
|
||
return points;
|
||
try
|
||
{
|
||
var query = new List<string>();
|
||
foreach (var id in ids)
|
||
{
|
||
var queries = $" SELECT * FROM \"{id}\" WHERE time >= '{st.ToString("yyyy-MM-dd HH:mm:ss")}' " + (et == null ? ""
|
||
: $"and time < '{et.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ") + "order by time";
|
||
query.Add(queries);
|
||
}
|
||
var response = await InfluxDbHelper.InfluxDbClient.Client.MultiQueryAsync(query, InfluxDbName());
|
||
if (!response.Any())
|
||
return points;
|
||
|
||
//得到Serie集合对象(返回执行多个查询的结果)
|
||
var series = response.ToList();
|
||
//取出第一条命令的查询结果,是一个集合
|
||
if (!series.Any())
|
||
return points;
|
||
foreach (var list in series)
|
||
{
|
||
foreach (var data in list)
|
||
{
|
||
foreach (var item in data.Values)
|
||
{
|
||
//取原始数据
|
||
points.Add(BuildModel(item.ToList()));
|
||
}
|
||
}
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
|
||
throw;
|
||
}
|
||
|
||
|
||
return points;
|
||
}
|
||
/// <summary>
|
||
/// 获取数据
|
||
/// </summary>
|
||
/// <param name="id"></param>
|
||
/// <param name="st"></param>
|
||
/// <param name="et"></param>
|
||
/// <returns></returns>
|
||
public async Task<List<T>> GetDatas(Guid id, DateTime st, DateTime? et)
|
||
{
|
||
if (st == DateTime.MinValue)
|
||
st = st.AddYears(2000);
|
||
var points = new List<T>();
|
||
var queries = $" SELECT * FROM \"{id}\" WHERE time >= '{st.ToString("yyyy-MM-dd HH:mm:ss")}' " + (et == null ? "" : $"and time < '{et.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ") + "order by time";
|
||
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
|
||
if (!response.Any())
|
||
return points;
|
||
|
||
//得到Serie集合对象(返回执行多个查询的结果)
|
||
var series = response.ToList();
|
||
if (!series.Any())
|
||
return points;
|
||
//取出第一条命令的查询结果,是一个集合
|
||
var list = series[0].Values;
|
||
|
||
//从集合中取出第一条数据
|
||
foreach (var item in list)
|
||
{
|
||
points.Add(BuildModel(item.ToList()));
|
||
}
|
||
|
||
return points;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 清空数据
|
||
/// </summary>
|
||
/// <param name="id"></param>
|
||
/// <returns></returns>
|
||
public async Task<bool> DeleteData(Guid id)
|
||
{
|
||
try
|
||
{
|
||
var queries = $"delete FROM \"{id}\"";
|
||
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
|
||
return true;
|
||
}
|
||
catch (Exception e)
|
||
{
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 删除数据
|
||
/// </summary>
|
||
/// <param name="id"></param>
|
||
/// <param name="time"></param>
|
||
/// <returns></returns>
|
||
public async Task<bool> DeleteData(Guid id, DateTime time)
|
||
{
|
||
try
|
||
{
|
||
var queries = $"delete FROM \"{id}\" WHERE \"time\" = '{time.ToString("yyyy-MM-dd HH:mm:ss")}'";
|
||
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
|
||
return true;
|
||
}
|
||
catch (Exception e)
|
||
{
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 删除时间区间内数据
|
||
/// </summary>
|
||
/// <param name="id"></param>
|
||
/// <param name="st"></param>
|
||
/// <param name="et"></param>
|
||
/// <returns></returns>
|
||
public async Task<bool> DeleteData(Guid id, DateTime? st, DateTime et)
|
||
{
|
||
try
|
||
{
|
||
var queries = $"delete FROM \"{id}\" WHERE \"time\" < '{et.ToString("yyyy-MM-dd HH:mm:ss")}' " + (st == null ? "" : $"and \"time\" >= '{st.GetValueOrDefault().ToString("yyyy-MM-dd HH:mm:ss")}' ");
|
||
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
|
||
return true;
|
||
}
|
||
catch (Exception e)
|
||
{
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取点最后一条数据。
|
||
/// </summary>
|
||
/// <param name="id"></param>
|
||
/// <param name="time"></param>
|
||
/// <returns></returns>
|
||
public async Task<T> GetLastData(Guid id, DateTime time)
|
||
{
|
||
var queries = $" SELECT * FROM \"{id}\" WHERE time< '{time.ToString("yyyy-MM-dd HH:mm:ss")}' order by time desc limit 1";
|
||
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
|
||
if (!response.Any())
|
||
return default(T);
|
||
|
||
//得到Serie集合对象(返回执行多个查询的结果)
|
||
var series = response.ToList();
|
||
//取出第一条命令的查询结果,是一个集合
|
||
var item = series[0].Values.FirstOrDefault().ToList();
|
||
|
||
return BuildModel(item);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取点第一个条数据。
|
||
/// </summary>
|
||
/// <param name="id"></param>
|
||
/// <returns></returns>
|
||
public async Task<T> GetFirstData(Guid id)
|
||
{
|
||
var queries = $" SELECT * FROM \"{id}\" order by time limit 1";
|
||
var response = await InfluxDbHelper.InfluxDbClient.Client.QueryAsync(queries, InfluxDbName());
|
||
if (!response.Any())
|
||
return default(T);
|
||
|
||
//得到Serie集合对象(返回执行多个查询的结果)
|
||
var series = response.ToList();
|
||
//取出第一条命令的查询结果,是一个集合
|
||
var list = series[0].Values.FirstOrDefault().ToList();
|
||
|
||
return BuildModel(list);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 输出错误日志
|
||
/// </summary>
|
||
/// <param name="str"></param>
|
||
public void ThrowError(string str)
|
||
{
|
||
LoggerManager.GetLogger().Error(str);
|
||
}
|
||
}
|
||
}
|