本文介绍如何在VPC环境下通过SASL接入点接入消息队列Kafka版并使用PLAIN机制收发消息。
前提条件
您已安装.NET。更多信息,请参见安装.NET。
安装C#依赖库
执行以下命令安装C#依赖库。
dotnet add package -v 1.5.2 Confluent.Kafka
发送消息
- 创建发送消息程序producer.cs。
using System; using Confluent.Kafka; class Producer { public static void Main(string[] args) { var conf = new ProducerConfig { BootstrapServers = "XXX,XXX,XXX", SaslMechanism = SaslMechanism.Plain, SecurityProtocol = SecurityProtocol.SaslPlaintext, SaslUsername = "XXX", SaslPassword = "XXX", }; Action<DeliveryReport<Null, string>> handler = r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}"); string topic ="XXX"; using (var p = new ProducerBuilder<Null, string>(conf).Build()) { for (int i=0; i<100; ++i) { p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler); } p.Flush(TimeSpan.FromSeconds(10)); } } }
参数 描述 BootstrapServers SASL接入点。您可在消息队列Kafka版控制台的实例详情页面的接入点信息区域获取。 SaslUsername SASL用户名。 说明- 如果实例未开启ACL,您可以在消息队列Kafka版控制台的实例详情页面的配置信息区域获取默认的用户名和密码。
- 如果实例已开启ACL,请确保要使用的SASL用户已被授予向消息队列Kafka版实例收发消息的权限。具体操作,请参见SASL用户授权。
SaslPassword SASL用户名密码。 topic Topic名称。您可在消息队列Kafka版控制台的Topic 管理页面获取。 - 执行以下命令发送消息。
dotnet run producer.cs
订阅消息
- 创建订阅消息程序consumer.cs。
using System; using System.Threading; using Confluent.Kafka; class Consumer { public static void Main(string[] args) { var conf = new ConsumerConfig { GroupId = "XXX", BootstrapServers = "XXX,XXX,XXX", SaslMechanism = SaslMechanism.Plain, SecurityProtocol = SecurityProtocol.SaslPlaintext, SaslUsername = "XXX", SaslPassword = "XXX", AutoOffsetReset = AutoOffsetReset.Earliest }; string topic = "XXX"; using (var c = new ConsumerBuilder<Ignore, string>(conf).Build()) { c.Subscribe(topic); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; try { while (true) { try { var cr = c.Consume(cts.Token); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { c.Close(); } } } }
参数 描述 GroupId Group名称。您可在消息队列Kafka版控制台的Group 管理页面获取。 BootstrapServers SASL接入点。您可在消息队列Kafka版控制台的实例详情页面的接入点信息区域获取。 SaslUsername SASL用户名。 说明- 如果实例未开启ACL,您可以在消息队列Kafka版控制台的实例详情页面的配置信息区域获取默认的用户名和密码。
- 如果实例已开启ACL,请确保要使用的SASL用户已被授予向消息队列Kafka版实例收发消息的权限。具体操作,请参见SASL用户授权。
SaslPassword SASL用户名密码。 topic Topic名称。您可在消息队列Kafka版控制台的Topic 管理页面获取。 - 执行以下命令消费消息。
dotnet run consumer.cs