This topic describes how to use the SDK for C# to connect to an endpoint of a Message Queue for Apache Kafka instance and send and subscribe to messages.

Environment requirements

.NET is installed. For more information, see Download .NET.

Install the C# library

Run the following command to install the C# library:
dotnet add package -v 1.5.2 Confluent.Kafka

Prepare configuration files

  1. Optional:Download the SSL root certificate. If you use the SSL endpoint to connect to your Message Queue for Apache Kafka instance, you must download this certificate.
  2. Configure the producer.cs and consumer.cs files.
    Table 1. Parameters
    Parameter Description
    BootstrapServers The SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the SSL endpoint in the Endpoint Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    SslCaLocation The path of the SSL root certificate that you downloaded. This parameter is required only if you use the SSL endpoint to connect to the Message Queue for Apache Kafka instance.
    SaslMechanism The security mechanism that is used to send and subscribe to messages.
    • If you use the SSL endpoint to connect to the Message Queue for Apache Kafka instance, set this parameter to SaslMechanism.Plain.
    • If you use the SASL endpoint to connect to the Message Queue for Apache Kafka instance, set this parameter to SaslMechanism.Plain to specify the PLAIN mechanism or set this parameter to SaslMechanism.ScramSha256 to specify the Salted Challenge Response Authentication Mechanism (SCRAM) mechanism.
    SecurityProtocol The security protocol that is used to send and subscribe to messages.
    • If you use the SSL endpoint to connect to the Message Queue for Apache Kafka instance, set this parameter to SecurityProtocol.SaslSsl.
    • If you use the SASL endpoint to connect to the Message Queue for Apache Kafka instance, set this parameter to SecurityProtocol.SaslPlaintext when the PLAIN mechanism is used, or set this parameter to SecurityProtocol.SaslPlaintext when the SCRAM mechanism is used.
    SaslUsername The username of the Simple Authentication and Security Layer (SASL) user. If you use the default endpoint to connect to the Message Queue for Apache Kafka instance, this parameter is excluded.
    Note
    • If the ACL feature is not enabled for your Message Queue for Apache Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the Message Queue for Apache Kafka console.
    • If the ACL feature is enabled for your Message Queue for Apache Kafka instance, make sure that the SASL user is authorized to send and consume messages by using the instance. For more information, see Grant permissions to SASL users.
    SaslPassword The password of the SASL user. If you use the default endpoint to connect to the Message Queue for Apache Kafka instance, this parameter is excluded.
    topic The name of the topic. You can obtain the name of the topic on the Topics page in the Message Queue for Apache Kafka console.
    GroupId The ID of the consumer group. You can obtain the ID of the consumer group on the Groups page in the Message Queue for Apache Kafka console.

Send messages

Run the following command to run producer.cs to send messages:

dotnet run producer.cs

The following sample code provides an example of producer.cs:

For information about the parameters in the sample code, see Parameters.
Notice In the sample code, the SSL endpoint is used. Delete or modify the code related to the parameters based on the endpoint that you use to connect to the Message Queue for Apache Kafka instance.
using System;
using Confluent.Kafka;

class Producer
{
    public static void Main(string[] args)
    {
        var conf = new ProducerConfig {
            BootstrapServers = "XXX,XXX,XXX",
            SslCaLocation = "XXX/only-4096-ca-cert.pem",
            SaslMechanism = SaslMechanism.Plain,
            SecurityProtocol = SecurityProtocol.SaslSsl,
            SaslUsername = "XXX",
            SaslPassword = "XXX",
            };

        Action<DeliveryReport<Null, string>> handler = r =>
            Console.WriteLine(!r.Error.IsError
                ? $"Delivered message to {r.TopicPartitionOffset}"
                : $"Delivery Error: {r.Error.Reason}");

        string topic ="XXX";

        using (var p = new ProducerBuilder<Null, string>(conf).Build())
        {
            for (int i=0; i<100; ++i)
            {
                p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler);
            }
            p.Flush(TimeSpan.FromSeconds(10));
        }
    }
}

Subscribe to messages

Run the following command to run consumer.cs to subscribe to messages:

dotnet run consumer.cs

The following sample code provides an example of consumer.cs:

For information about the parameters in the sample code, see Parameters.
Notice In the sample code, the SSL endpoint is used. Delete or modify the code related to the parameters based on the endpoint that you use to connect to the Message Queue for Apache Kafka instance.
using System;
using System.Threading;
using Confluent.Kafka;

class Consumer
{
    public static void Main(string[] args)
    {
        var conf = new ConsumerConfig {
            GroupId = "XXX",
            BootstrapServers = "XXX,XXX,XXX",
            SslCaLocation = "XXX/only-4096-ca-cert.pem",
            SaslMechanism = SaslMechanism.Plain,
            SecurityProtocol = SecurityProtocol.SaslSsl,
            SaslUsername = "XXX",
            SaslPassword = "XXX",
            AutoOffsetReset = AutoOffsetReset.Earliest
        };

        string topic = "XXX";

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe(topic);

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true;
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                c.Close();
            }
        }
    }
}