mh_frame_sps/Apt.Infrastructure.Utility/Kafka/KafkaHelper.cs

345 lines
16 KiB
C#
Raw Permalink Normal View History

2026-04-07 13:47:52 +08:00
//using Confluent.Kafka;
//using Confluent.Kafka.Admin;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http.Headers;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace APT.Infrastructure.Utility
{
/// <summary>
/// kafka操作类
/// </summary>
//public static class KafkaHelper
//{
// /// <summary>
// /// 生产
// /// </summary>
// /// <param name="broker">服务器ip</param>
// /// <param name="topic">topic</param>
// /// <param name="key">消息key</param>
// /// <param name="text">消息内容</param>
// /// <returns></returns>
// public static async Task Produce(string broker, string topic, string key, string text)
// {
// var config = new ProducerConfig { BootstrapServers = broker };
// using (var producer = new ProducerBuilder<string, string>(config).Build())
// {
// if (text == null)
// {
// throw (new Exception("内容不可为空"));
// }
// try
// {
// // Note: Awaiting the asynchronous produce request below prevents flow of execution
// // from proceeding until the acknowledgement from the broker is received (at the
// // expense of low throughput).
// var deliveryReport = await producer.ProduceAsync(
// topic, new Message<string, string> { Key = key, Value = text });
// }
// catch (ProduceException<string, string> e)
// {
// throw (new Exception($"failed to deliver message: {e.Message} [{e.Error.Code}]"));
// }
// // Since we are producing synchronously, at this point there will be no messages
// // in-flight and no delivery reports waiting to be acknowledged, so there is no
// // need to call producer.Flush before disposing the producer.
// }
// }
// /// <summary>
// /// 消费
// /// </summary>
// /// <param name="broker">服务器ip</param>
// /// <param name="topic">topic</param>
// /// <param name="cts">取消令牌</param>
// /// <param name="action">接收消息处理事件</param>
// /// <param name="commitPeriod">每几条接收一次消息</param>
// /// <param name="groupId">用户组</param>
// /// <param name="isContinue">是否持续读取</param>
// /// <returns></returns>
// public static async Task Consume(string broker, string topic, CancellationTokenSource cts, Action<List<string>> action,
// int commitPeriod = 5, string groupId = "optKafkaGroup", bool isContinue = false)
// {
// var config = new ProducerConfig { BootstrapServers = broker };
// RunConsume(broker, new List<string> { topic }, cts.Token, commitPeriod, groupId, action, isContinue);
// }
// public static async Task ConsumeParallel(string broker, string topic, CancellationTokenSource cts, Action<List<string>> action,
// int commitPeriod = 5, string groupId = "optKafkaGroup")
// {
// var config = new ProducerConfig { BootstrapServers = broker };
// RunConsumeParallel(broker, new List<string> { topic }, cts.Token, commitPeriod, groupId, action);
// }
// /// <summary>
// /// 创建消息分类
// /// </summary>
// /// <param name="broker">服务器</param>
// /// <param name="topic">topic</param>
// /// <param name="replicationFactor">备份数</param>
// /// <param name="numPartitions">patition数</param>
// /// <returns></returns>
// public static async Task CreateTopicAsync(string broker, string topic, short replicationFactor = 1, short numPartitions = 1)
// {
// using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = broker }).Build())
// {
// try
// {
// await adminClient.CreateTopicsAsync(new TopicSpecification[] {
// new TopicSpecification { Name = topic, ReplicationFactor = replicationFactor, NumPartitions =numPartitions } });
// }
// catch (CreateTopicsException e)
// {
// Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
// throw e;
// }
// }
// }
// private static void RunConsume(string brokerList, List<string> topics, CancellationToken cancellationToken, int commitPeriod, string groupId,
// Action<List<string>> Action, bool isContinue)
// {
// var config = new ConsumerConfig
// {
// BootstrapServers = brokerList,
// GroupId = groupId,
// EnableAutoCommit = false,
// StatisticsIntervalMs = 5000,
// SessionTimeoutMs = 6000,
// AutoOffsetReset = AutoOffsetReset.Earliest,
// EnablePartitionEof = true
// };
// //const int commitPeriod = 5;
// // Note: If a key or value deserializer is not set (as is the case below), the
// // deserializer corresponding to the appropriate type from Confluent.Kafka.Deserializers
// // will be used automatically (where available). The default deserializer for string
// // is UTF8. The default deserializer for Ignore returns null for all input data
// // (including non-null data).
// using (var consumer = new ConsumerBuilder<Ignore, string>(config)
// // Note: All handlers are called on the main .Consume thread.
// .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
// .SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
// .SetPartitionsAssignedHandler((c, partitions) =>
// {
// Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
// // possibly manually specify start offsets or override the partition assignment provided by
// // the consumer group by returning a list of topic/partition/offsets to assign to, e.g.:
// //
// // return partitions.Select(tp => new TopicPartitionOffset(tp, externalOffsets[tp]));
// })
// .SetPartitionsRevokedHandler((c, partitions) =>
// {
// Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
// })
// .Build())
// {
// consumer.Subscribe(topics);
// ConsumeResult<Ignore, string> consumeResult = null;
// ConsumeResult<Ignore, string> lastConsumeResult = null;
// List<string> ReciveMessage = new List<string>();
// try
// {
// while (true)
// {
// try
// {
// consumeResult = consumer.Consume(cancellationToken);
// if (consumeResult.IsPartitionEOF)
// {
// //Console.WriteLine(
// //$"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
// if (lastConsumeResult != null)
// {
// Action(ReciveMessage);//执行方法
// try
// {
// consumer.Commit(lastConsumeResult);
// }
// catch (KafkaException e)
// {
// Console.WriteLine($"Commit error: {e.Error.Reason}");
// throw (e);
// }
// finally
// {
// ReciveMessage.Clear();
// }
// }
// if (isContinue)
// continue;
// else
// break;
// }
// ReciveMessage.Add(consumeResult.Message.Value);
// //Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
// if (ReciveMessage.Count % commitPeriod == 0)
// {
// // The Commit method sends a "commit offsets" request to the Kafka
// // cluster and synchronously waits for the response. This is very
// // slow compared to the rate at which the consumer is capable of
// // consuming messages. A high performance application will typically
// // commit offsets relatively infrequently and be designed handle
// // duplicate messages in the event of failure.
// Action(ReciveMessage);//执行方法
// try
// {
// consumer.Commit(consumeResult);
// }
// catch (KafkaException e)
// {
// Console.WriteLine($"Commit error: {e.Error.Reason}");
// throw (e);
// }
// finally
// {
// ReciveMessage.Clear();
// }
// if (isContinue)
// continue;
// else
// break;
// }
// lastConsumeResult = consumeResult;
// }
// catch (ConsumeException e)
// {
// Console.WriteLine($"Consume error: {e.Error.Reason}");
// throw (e);
// }
// }
// }
// catch (OperationCanceledException e)
// {
// Console.WriteLine("Closing consumer.");
// consumer.Close();
// throw (e);
// }
// catch (Exception e)
// {
// throw (e);
// }
// }
// }
// private static void RunConsumeParallel(string brokerList, List<string> topics, CancellationToken cancellationToken, int commitPeriod, string groupId,
// Action<List<string>> Action)
// {
// var config = new ConsumerConfig
// {
// BootstrapServers = brokerList,
// GroupId = groupId,
// EnableAutoCommit = false,
// StatisticsIntervalMs = 5000,
// SessionTimeoutMs = 6000,
// AutoOffsetReset = AutoOffsetReset.Earliest,
// EnablePartitionEof = true
// };
// //const int commitPeriod = 5;
// // Note: If a key or value deserializer is not set (as is the case below), the
// // deserializer corresponding to the appropriate type from Confluent.Kafka.Deserializers
// // will be used automatically (where available). The default deserializer for string
// // is UTF8. The default deserializer for Ignore returns null for all input data
// // (including non-null data).
// using (var consumer = new ConsumerBuilder<Ignore, string>(config)
// // Note: All handlers are called on the main .Consume thread.
// .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
// .SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
// .SetPartitionsAssignedHandler((c, partitions) =>
// {
// Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
// // possibly manually specify start offsets or override the partition assignment provided by
// // the consumer group by returning a list of topic/partition/offsets to assign to, e.g.:
// //
// // return partitions.Select(tp => new TopicPartitionOffset(tp, externalOffsets[tp]));
// })
// .SetPartitionsRevokedHandler((c, partitions) =>
// {
// Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
// })
// .Build())
// {
// consumer.Subscribe(topics);
// List<ConsumeResult<Ignore, string>> consumeResultList = new List<ConsumeResult<Ignore, string>>();
// ConsumeResult<Ignore, string> consumeResult = null;
// ConsumeResult<Ignore, string> lastConsumeResult = null;
// List<string> ReciveMessage = new List<string>();
// try
// {
// Task[] tasks = new Task[commitPeriod];
// for (var x = 0; x < commitPeriod; x++)
// {
// tasks[x] = Task.Factory.StartNew(() =>
// {
// try
// {
// consumeResult = consumer.Consume(cancellationToken);
// consumeResultList.Add(consumeResult);
// if (consumeResult.IsPartitionEOF)
// {
// }
// }
// catch (ConsumeException e)
// {
// Console.WriteLine($"Consume error: {e.Error.Reason}");
// throw (e);
// };
// });
// }
// Task.WaitAll(tasks);
// consumeResultList = consumeResultList.OrderBy(i => i.Offset.Value).ToList();
// foreach (var s in consumeResultList)
// {
// if (s.Message != null && !string.IsNullOrEmpty(s.Message.Value))
// {
// ReciveMessage.Add(s.Message.Value);
// }
// }
// if (ReciveMessage.Any())
// {
// consumer.Commit(consumeResultList.LastOrDefault());
// Action(ReciveMessage);//执行方法
// }
// }
// catch (OperationCanceledException e)
// {
// Console.WriteLine("Closing consumer.");
// consumer.Close();
// throw (e);
// }
// catch (Exception e)
// {
// throw (e);
// }
// }
// }
//}
}