//using Confluent.Kafka; //using Confluent.Kafka.Admin; using System; using System.Collections.Generic; using System.Linq; using System.Net.Http.Headers; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace APT.Infrastructure.Utility { /// /// kafka操作类 /// //public static class KafkaHelper //{ // /// // /// 生产 // /// // /// 服务器ip // /// topic // /// 消息key // /// 消息内容 // /// // public static async Task Produce(string broker, string topic, string key, string text) // { // var config = new ProducerConfig { BootstrapServers = broker }; // using (var producer = new ProducerBuilder(config).Build()) // { // if (text == null) // { // throw (new Exception("内容不可为空")); // } // try // { // // Note: Awaiting the asynchronous produce request below prevents flow of execution // // from proceeding until the acknowledgement from the broker is received (at the // // expense of low throughput). // var deliveryReport = await producer.ProduceAsync( // topic, new Message { Key = key, Value = text }); // } // catch (ProduceException e) // { // throw (new Exception($"failed to deliver message: {e.Message} [{e.Error.Code}]")); // } // // Since we are producing synchronously, at this point there will be no messages // // in-flight and no delivery reports waiting to be acknowledged, so there is no // // need to call producer.Flush before disposing the producer. // } // } // /// // /// 消费 // /// // /// 服务器ip // /// topic // /// 取消令牌 // /// 接收消息处理事件 // /// 每几条接收一次消息 // /// 用户组 // /// 是否持续读取 // /// // public static async Task Consume(string broker, string topic, CancellationTokenSource cts, Action> action, // int commitPeriod = 5, string groupId = "optKafkaGroup", bool isContinue = false) // { // var config = new ProducerConfig { BootstrapServers = broker }; // RunConsume(broker, new List { topic }, cts.Token, commitPeriod, groupId, action, isContinue); // } // public static async Task ConsumeParallel(string broker, string topic, CancellationTokenSource cts, Action> action, // int commitPeriod = 5, string groupId = "optKafkaGroup") // { // var config = new ProducerConfig { BootstrapServers = broker }; // RunConsumeParallel(broker, new List { topic }, cts.Token, commitPeriod, groupId, action); // } // /// // /// 创建消息分类 // /// // /// 服务器 // /// topic // /// 备份数 // /// patition数 // /// // public static async Task CreateTopicAsync(string broker, string topic, short replicationFactor = 1, short numPartitions = 1) // { // using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = broker }).Build()) // { // try // { // await adminClient.CreateTopicsAsync(new TopicSpecification[] { // new TopicSpecification { Name = topic, ReplicationFactor = replicationFactor, NumPartitions =numPartitions } }); // } // catch (CreateTopicsException e) // { // Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); // throw e; // } // } // } // private static void RunConsume(string brokerList, List topics, CancellationToken cancellationToken, int commitPeriod, string groupId, // Action> Action, bool isContinue) // { // var config = new ConsumerConfig // { // BootstrapServers = brokerList, // GroupId = groupId, // EnableAutoCommit = false, // StatisticsIntervalMs = 5000, // SessionTimeoutMs = 6000, // AutoOffsetReset = AutoOffsetReset.Earliest, // EnablePartitionEof = true // }; // //const int commitPeriod = 5; // // Note: If a key or value deserializer is not set (as is the case below), the // // deserializer corresponding to the appropriate type from Confluent.Kafka.Deserializers // // will be used automatically (where available). The default deserializer for string // // is UTF8. The default deserializer for Ignore returns null for all input data // // (including non-null data). // using (var consumer = new ConsumerBuilder(config) // // Note: All handlers are called on the main .Consume thread. // .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")) // .SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}")) // .SetPartitionsAssignedHandler((c, partitions) => // { // Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]"); // // possibly manually specify start offsets or override the partition assignment provided by // // the consumer group by returning a list of topic/partition/offsets to assign to, e.g.: // // // // return partitions.Select(tp => new TopicPartitionOffset(tp, externalOffsets[tp])); // }) // .SetPartitionsRevokedHandler((c, partitions) => // { // Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]"); // }) // .Build()) // { // consumer.Subscribe(topics); // ConsumeResult consumeResult = null; // ConsumeResult lastConsumeResult = null; // List ReciveMessage = new List(); // try // { // while (true) // { // try // { // consumeResult = consumer.Consume(cancellationToken); // if (consumeResult.IsPartitionEOF) // { // //Console.WriteLine( // //$"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); // if (lastConsumeResult != null) // { // Action(ReciveMessage);//执行方法 // try // { // consumer.Commit(lastConsumeResult); // } // catch (KafkaException e) // { // Console.WriteLine($"Commit error: {e.Error.Reason}"); // throw (e); // } // finally // { // ReciveMessage.Clear(); // } // } // if (isContinue) // continue; // else // break; // } // ReciveMessage.Add(consumeResult.Message.Value); // //Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}"); // if (ReciveMessage.Count % commitPeriod == 0) // { // // The Commit method sends a "commit offsets" request to the Kafka // // cluster and synchronously waits for the response. This is very // // slow compared to the rate at which the consumer is capable of // // consuming messages. A high performance application will typically // // commit offsets relatively infrequently and be designed handle // // duplicate messages in the event of failure. // Action(ReciveMessage);//执行方法 // try // { // consumer.Commit(consumeResult); // } // catch (KafkaException e) // { // Console.WriteLine($"Commit error: {e.Error.Reason}"); // throw (e); // } // finally // { // ReciveMessage.Clear(); // } // if (isContinue) // continue; // else // break; // } // lastConsumeResult = consumeResult; // } // catch (ConsumeException e) // { // Console.WriteLine($"Consume error: {e.Error.Reason}"); // throw (e); // } // } // } // catch (OperationCanceledException e) // { // Console.WriteLine("Closing consumer."); // consumer.Close(); // throw (e); // } // catch (Exception e) // { // throw (e); // } // } // } // private static void RunConsumeParallel(string brokerList, List topics, CancellationToken cancellationToken, int commitPeriod, string groupId, // Action> Action) // { // var config = new ConsumerConfig // { // BootstrapServers = brokerList, // GroupId = groupId, // EnableAutoCommit = false, // StatisticsIntervalMs = 5000, // SessionTimeoutMs = 6000, // AutoOffsetReset = AutoOffsetReset.Earliest, // EnablePartitionEof = true // }; // //const int commitPeriod = 5; // // Note: If a key or value deserializer is not set (as is the case below), the // // deserializer corresponding to the appropriate type from Confluent.Kafka.Deserializers // // will be used automatically (where available). The default deserializer for string // // is UTF8. The default deserializer for Ignore returns null for all input data // // (including non-null data). // using (var consumer = new ConsumerBuilder(config) // // Note: All handlers are called on the main .Consume thread. // .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")) // .SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}")) // .SetPartitionsAssignedHandler((c, partitions) => // { // Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]"); // // possibly manually specify start offsets or override the partition assignment provided by // // the consumer group by returning a list of topic/partition/offsets to assign to, e.g.: // // // // return partitions.Select(tp => new TopicPartitionOffset(tp, externalOffsets[tp])); // }) // .SetPartitionsRevokedHandler((c, partitions) => // { // Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]"); // }) // .Build()) // { // consumer.Subscribe(topics); // List> consumeResultList = new List>(); // ConsumeResult consumeResult = null; // ConsumeResult lastConsumeResult = null; // List ReciveMessage = new List(); // try // { // Task[] tasks = new Task[commitPeriod]; // for (var x = 0; x < commitPeriod; x++) // { // tasks[x] = Task.Factory.StartNew(() => // { // try // { // consumeResult = consumer.Consume(cancellationToken); // consumeResultList.Add(consumeResult); // if (consumeResult.IsPartitionEOF) // { // } // } // catch (ConsumeException e) // { // Console.WriteLine($"Consume error: {e.Error.Reason}"); // throw (e); // }; // }); // } // Task.WaitAll(tasks); // consumeResultList = consumeResultList.OrderBy(i => i.Offset.Value).ToList(); // foreach (var s in consumeResultList) // { // if (s.Message != null && !string.IsNullOrEmpty(s.Message.Value)) // { // ReciveMessage.Add(s.Message.Value); // } // } // if (ReciveMessage.Any()) // { // consumer.Commit(consumeResultList.LastOrDefault()); // Action(ReciveMessage);//执行方法 // } // } // catch (OperationCanceledException e) // { // Console.WriteLine("Closing consumer."); // consumer.Close(); // throw (e); // } // catch (Exception e) // { // throw (e); // } // } // } //} }