一、引言
有段时间没有写东西了,当然不是没得写,还有MongoDB的系列没有写完呢,那个系列还要继续。今天正好是周末,有点时间,来写新东西吧。最近公司用了Kafka做为消息的中间件,最开始写的那个版本不是很好,我就要来优化它,所以就抽了一些时间来研究Kafka。很多概念性的东西就不写了,今天主要是上干货,主要是代码,今天就把Kafka的消费者和生产者的代码贴出来,以供大家参考,当然这个是代码样板,最后我也会把地址贴出来。以后有时间我会把我自己实现的Kafka消息的生产者和消费者的代码贴出来。好了,话不多说,言归正传。 说明一点,如果想调试这里的代码,必须引入Confluent.Kafka这个dll才可以,直接在Visual Studio 项目的 Nuget 里面可以查找,直接安装就可以了。二、消息的生产者(Kafka消息的Producer) 大多数的消息中间件都包含三个部分,一个是消息的生产者,一个是存放消息的队列,另外一个就是消息的消费者,我们就按着这个顺序,我就先把消息生产者的代码写出来。直接上代码,其实不是很难,里面有很多备注,只要有基本的概念理解起来还是很容易的。 第一个版本,同步版本!1 using System; 2 using System.IO; 3 using System.Text; 4 using System.Collections.Generic; 5 using Confluent.Kafka; 6 using Confluent.Kafka.Serialization; 7 8 9 namespace Confluent.Kafka.Examples.Producer10 {11 public class Program12 {13 public static void Main(string[] args)14 {15 if (args.Length != 2)16 {17 Console.WriteLine("Usage: .. brokerList topicName");18 return;19 }20 21 string brokerList = args[0];22 string topicName = args[1];23 24 var config = new Dictionary{ { "bootstrap.servers", brokerList } };25 26 using (var producer = new Producer (config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))27 {28 var cancelled = false;29 Console.CancelKeyPress += (_, e) => {30 e.Cancel = true; // 阻止进程退出31 cancelled = true;32 };33 34 while (!cancelled)35 {36 Console.Write("> ");37 38 string text;39 try40 {41 text = Console.ReadLine();42 }43 catch (IOException)44 {45 // IO 异常抛出的时候设置此值ConsoleCancelEventArgs.Cancel == true.46 break;47 }48 if (text == null)49 {50 break;51 }52 53 string key = null;54 string val = text;55 56 // 如果指定了键和值,则拆分行.57 int index = text.IndexOf(" ");58 if (index != -1)59 {60 key = text.Substring(0, index);61 val = text.Substring(index + 1);62 }63 64 // 在下面的异步生产请求上调用.Result会导致它阻塞,直到它完成。 通常,您应该避免同步生成,因为这会对吞吐量产生巨大影响。对于这个交互式控制台的例子,这是我们想要的。65 var deliveryReport = producer.ProduceAsync(topicName, key, val).Result;66 Console.WriteLine(67 deliveryReport.Error.Code == ErrorCode.NoError68 ? "delivered to: "+deliveryReport.TopicPartitionOffset69 : "failed to deliver message: "+deliveryReport.Error.Reason70 );71 }72 73 // 由于我们是同步的生产消息,此时不会有消息在传输并且也不需要等待消息到达的确认应答, 销毁生产者之前我们是不需要调用 producer.Flush 方法, 就像正常使用一样。74 }75 }76 }77 }
第二个版本,异步版本,推荐使用
1 using System; 2 using System.IO; 3 using System.Text; 4 using System.Collections.Generic; 5 using Confluent.Kafka; 6 using Confluent.Kafka.Serialization; 7 8 9 namespace Confluent.Kafka.Examples.Producer10 {11 public class Program12 {13 public static void Main(string[] args)14 {15 if (args.Length != 2)16 {17 Console.WriteLine("Usage: .. brokerList topicName");18 return;19 }20 21 string brokerList = args[0];22 string topicName = args[1];23 string message="我就是要传输的消息内容";24 25 //这是以异步方式生产消息的代码实例26 var config = new Dictionary{ { "bootstrap.servers", brokerList } };27 using (var producer = new Producer (config, null, new StringSerializer(Encoding.UTF8)))28 {29 var deliveryReport = producer.ProduceAsync(topicName, null, message);30 deliveryReport.ContinueWith(task =>31 {32 Console.WriteLine("Producer: "+producer.Name+"\r\nTopic: "+topicName+"\r\nPartition: "+task.Result.Partition+"\r\nOffset: "+task.Result.Offset);33 });34 35 producer.Flush(TimeSpan.FromSeconds(10));36 }37 }38 }39 }
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using Confluent.Kafka.Serialization; 6 7 8 ///9 /// 演示如何使用Consumer客户端. 10 /// 11 namespace Confluent.Kafka.Examples.Consumer 12 { 13 public class Program 14 { 15 ///16 // 在这个例子中: 17 /// - offsets 是自动提交的。 18 /// - consumer.Poll / OnMessage 是用于消息消费的。 19 /// - 没有为轮询循环创建(Poll)二外的线程,当然可以创建 20 /// 21 public static void Run_Poll(string brokerList, Listtopics) 22 { 23 var config = new Dictionary 24 { 25 { "bootstrap.servers", brokerList }, 26 { "group.id", "csharp-consumer" }, 27 { "enable.auto.commit", true }, // 默认值 28 { "auto.commit.interval.ms", 5000 }, 29 { "statistics.interval.ms", 60000 }, 30 { "session.timeout.ms", 6000 }, 31 { "auto.offset.reset", "smallest" } 32 }; 33 34 using (var consumer = new Consumer (config, null, new StringDeserializer(Encoding.UTF8))) 35 { 36 // 注意: 所有事件处理程序的执行都是在主线程中执行的,就是同步的。 37 38 //当成功消费了消息就会触发该事件 39 consumer.OnMessage += (_, msg) => Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value); 40 41 consumer.OnPartitionEOF += (_, end) => Console.WriteLine("Reached end of topic "+end.Topic+" partition "+end.Partition+", next message will be at offset "+end.Offset); 42 43 //当然发生了严重错误,比如,连接丢失或者Kafka服务器无效就会触发该事件 44 consumer.OnError += (_, error) => Console.WriteLine("Error: "+error); 45 46 //当反序列化有错误,或者消费的过程中发生了错误,即error != NoError,就会触发该事件 47 consumer.OnConsumeError += (_, msg) 48 => Console.WriteLine("Error consuming from topic/partition/offset "+msg.Topic+"/"+msg.Partition+"/"+msg.Offset+": "+msg.Error); 49 50 //成功提交了Offsets会触发该事件 51 consumer.OnOffsetsCommitted += (_, commit) => Console.WriteLine(commit.Error ? "Failed to commit offsets: "+commit.Error : "Successfully committed offsets: "+commit.Offsets); 52 53 // 当消费者被分配一组新的分区时触发该事件 54 consumer.OnPartitionsAssigned += (_, partitions) => 55 { 56 Console.WriteLine("Assigned partitions:"+partitions+" "+member id: "+consumer.MemberId); 57 // 如果您未向OnPartitionsAssigned事件添加处理程序,则会自动执行以下.Assign调用。 如果你这样做,你必须明确地调用.Assign以便消费者开始消费消息。 58 //开始从分区中消息消息 59 consumer.Assign(partitions); 60 }; 61 62 // 当消费者的当前分区集已被撤销时引发该事件。 63 consumer.OnPartitionsRevoked += (_, partitions) => 64 { 65 Console.WriteLine("Revoked partitions:"+partitions); 66 // 如果您未向OnPartitionsRevoked事件添加处理程序,则下面的.Unassign调用会自动发生。 如果你这样做了,你必须明确地调用.Usessign以便消费者停止从它先前分配的分区中消费消息。 67 68 //停止从分区中消费消息 69 consumer.Unassign(); 70 }; 71 72 consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: "+json); 73 74 consumer.Subscribe(topics); 75 76 Console.WriteLine("Subscribed to:"+consumer.Subscription); 77 78 var cancelled = false; 79 Console.CancelKeyPress += (_, e) => { 80 e.Cancel = true; // 组织进程退出 81 cancelled = true; 82 }; 83 84 Console.WriteLine("Ctrl-C to exit."); 85 while (!cancelled) 86 { 87 consumer.Poll(TimeSpan.FromMilliseconds(100)); 88 } 89 } 90 } 91 92 /// 93 /// 在这实例中 94 /// - offsets 是手动提交的。 95 /// - consumer.Consume方法用于消费消息 96 /// (所有其他事件仍由事件处理程序处理) 97 /// -没有为了 轮询(消耗)循环 创建额外的线程。 98 /// 99 public static void Run_Consume(string brokerList, Listtopics)100 {101 var config = new Dictionary 102 {103 { "bootstrap.servers", brokerList },104 { "group.id", "csharp-consumer" },105 { "enable.auto.commit", false },106 { "statistics.interval.ms", 60000 },107 { "session.timeout.ms", 6000 },108 { "auto.offset.reset", "smallest" }109 };110 111 using (var consumer = new Consumer (config, null, new StringDeserializer(Encoding.UTF8)))112 {113 // 注意:所有事件处理都是在主线程中处理的,也就是说同步的114 115 consumer.OnPartitionEOF += (_, end)116 => Console.WriteLine("Reached end of topic "+end.Topic+" partition "+end.Partition+", next message will be at offset "+end.Offset);117 118 consumer.OnError += (_, error)=> Console.WriteLine("Error: "+error);119 120 // 当反序列化有错误,或者消费的过程中发生了错误,即error != NoError,就会触发该事件121 consumer.OnConsumeError += (_, error)=> Console.WriteLine("Consume error: "+error);122 123 // 当消费者被分配一组新的分区时触发该事件124 consumer.OnPartitionsAssigned += (_, partitions) =>125 {126 Console.WriteLine("Assigned partitions:"+partitions+" "+member id: "+consumer.MemberId);127 // 如果您未向OnPartitionsAssigned事件添加处理程序,则会自动执行以下.Assign调用。 如果你这样做,你必须明确地调用.Assign以便消费者开始消费消息。128 //开始从分区中消息消息129 consumer.Assign(partitions);130 };131 132 // 当消费者的当前分区集已被撤销时引发该事件。133 consumer.OnPartitionsRevoked += (_, partitions) =>134 {135 Console.WriteLine("Revoked partitions:"+partitions);136 // 如果您未向OnPartitionsRevoked事件添加处理程序,则下面的.Unassign调用会自动发生。 如果你这样做了,你必须明确地调用.Usessign以便消费者停止从它先前分配的分区中消费消息。137 138 //停止从分区中消费消息139 consumer.Unassign();140 };141 142 consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: "+json);143 144 consumer.Subscribe(topics);145 146 Console.WriteLine("Started consumer, Ctrl-C to stop consuming");147 148 var cancelled = false;149 Console.CancelKeyPress += (_, e) => {150 e.Cancel = true; // 防止进程退出151 cancelled = true;152 };153 154 while (!cancelled)155 {156 if (!consumer.Consume(out Message msg, TimeSpan.FromMilliseconds(100)))157 {158 continue;159 }160 161 Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value);162 163 if (msg.Offset % 5 == 0)164 {165 var committedOffsets = consumer.CommitAsync(msg).Result;166 Console.WriteLine("Committed offset: "+committedOffsets);167 }168 }169 }170 }171 172 /// 173 /// 在这个例子中174 /// - 消费者组功能(即.Subscribe +offset提交)不被使用。175 /// - 将消费者手动分配给分区,并始终从特定偏移量(0)开始消耗。176 /// 177 public static void Run_ManualAssign(string brokerList, Listtopics)178 {179 var config = new Dictionary 180 {181 // 即使您不打算使用任何使用者组功能,也必须在创建使用者时指定group.id属性。182 { "group.id", new Guid().ToString() },183 { "bootstrap.servers", brokerList },184 // 即使消费者没有订阅该组,也可以将分区偏移量提交给一个组。 在这个例子中,自动提交被禁用以防止发生这种情况。185 { "enable.auto.commit", false }186 };187 188 using (var consumer = new Consumer (config, null, new StringDeserializer(Encoding.UTF8)))189 {190 //总是从0开始消费191 consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 0, Offset.Beginning)).ToList());192 193 // 引发严重错误,例如 连接失败或所有Kafka服务器失效。194 consumer.OnError += (_, error) => Console.WriteLine("Error: "+error);195 196 // 这个事件是由于在反序列化出现错误,或者在消息消息的时候出现错误,也就是 error != NoError 的时候引发该事件197 consumer.OnConsumeError += (_, error) => Console.WriteLine("Consume error: "+error);198 199 while (true)200 {201 if (consumer.Consume(out Message msg, TimeSpan.FromSeconds(1)))202 {203 Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value);204 }205 }206 }207 }208 209 private static void PrintUsage()=> Console.WriteLine("Usage: .. [topic..]");210 211 public static void Main(string[] args)212 {213 if (args.Length < 3)214 {215 PrintUsage();216 return;217 }218 219 var mode = args[0];220 var brokerList = args[1];221 var topics = args.Skip(2).ToList();222 223 switch (mode)224 {225 case "poll":226 Run_Poll(brokerList, topics);227 break;228 case "consume":229 Run_Consume(brokerList, topics);230 break;231 case "manual":232 Run_ManualAssign(brokerList, topics);233 break;234 default:235 PrintUsage();236 break;237 }238 }239 }240 }