This topic describes how to use the SDK for C# to connect to the SSL endpoint of a Message Queue for Apache Kafka instance and use the PLAIN mechanism to send and consume messages over the Internet.

Prerequisites

.NET is installed. For more information, see Download .NET.

Install the C# library

Run the following command to install the C# library:
dotnet add package -v 1.5.2 Confluent.Kafka

Preparations

Send messages

  1. Create a producer program named producer.cs that contains the following code:
    using System;
    using Confluent.Kafka;
    
    class Producer
    {
        public static void Main(string[] args)
        {
            var conf = new ProducerConfig {
                BootstrapServers = "XXX,XXX,XXX",
                SslCaLocation = "XXX/ca-cert.pem",
                SaslMechanism = SaslMechanism.Plain,
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SaslUsername = "XXX",
                SaslPassword = "XXX",
                };
    
            Action<DeliveryReport<Null, string>> handler = r =>
                Console.WriteLine(!r.Error.IsError
                    ? $"Delivered message to {r.TopicPartitionOffset}"
                    : $"Delivery Error: {r.Error.Reason}");
    
            string topic ="XXX";
    
            using (var p = new ProducerBuilder<Null, string>(conf).Build())
            {
                for (int i=0; i<100; ++i)
                {
                    p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler);
                }
                p.Flush(TimeSpan.FromSeconds(10));
            }
        }
    }
    Parameter Description
    BootstrapServers The SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the SSL endpoint in the Endpoint Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    SslCaLocation The path of the SSL root certificate.
    SaslUsername The username of the Simple Authentication and Security Layer (SASL) user.
    Note
    • If the ACL feature is not enabled for your Message Queue for Apache Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    • If the ACL feature is enabled for your Message Queue for Apache Kafka instance, make sure that the SASL user is authorized to send and consume messages by using the instance. For more information, see Grant permissions to SASL users.
    SaslPassword The password of the SASL user.
    topic The name of the topic. You can obtain the name of the topic on the Topics page in the Message Queue for Apache Kafka console.
  2. Run the following command to run producer.cs to send messages:
    dotnet run producer.cs

Consume messages

  1. Create a consumer program named consumer.cs that contains the following code:
    using System;
    using System.Threading;
    using Confluent.Kafka;
    
    class Consumer
    {
        public static void Main(string[] args)
        {
            var conf = new ConsumerConfig {
                GroupId = "XXX",
                BootstrapServers = "XXX,XXX,XXX",
                SslCaLocation = "XXX/ca-cert.pem",
                SaslMechanism = SaslMechanism.Plain,
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SaslUsername = "XXX",
                SaslPassword = "XXX",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };
    
            string topic = "XXX";
    
            using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
            {
                c.Subscribe(topic);
    
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true;
                    cts.Cancel();
                };
    
                try
                {
                    while (true)
                    {
                        try
                        {
                            var cr = c.Consume(cts.Token);
                            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Error occured: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    c.Close();
                }
            }
        }
    }
    Parameter Description
    GroupId The name of the consumer group. Group You can obtain the name of the consumer group on the Groups page in the Message Queue for Apache Kafka console.
    BootstrapServers The SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the SSL endpoint in the Endpoint Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    SslCaLocation The path of the SSL root certificate.
    SaslUsername The username of the SASL user.
    Note
    • If the ACL feature is not enabled for your Message Queue for Apache Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    • If the ACL feature is enabled for your Message Queue for Apache Kafka instance, make sure that the SASL user is authorized to send and consume messages by using the instance. For more information, see Grant permissions to SASL users.
    SaslPassword The password of the SASL user.
    topic The name of the topic. You can obtain the name of the topic on the Topics page in the Message Queue for Apache Kafka console.
  2. Run the following command to run consumer.cs consume messages:
    dotnet run consumer.cs