All Products
Search
Document Center

ApsaraMQ for Kafka:Use the SDK for C# to send and receive messages

Last Updated:Mar 15, 2024

This topic describes how to use the SDK for C# to connect to ApsaraMQ for Kafka to send and receive messages.

Environment requirements

.NET is installed. For more information, visit the download page of .NET.

Install the C# library

Run the following command to install the C# library:

dotnet add package -v 1.5.2 Confluent.Kafka

Create configuration files

  1. (Optional) Download the Secure Sockets Layer (SSL) root certificate. If you use the SSL endpoint to connect to your ApsaraMQ for Kafka instance, you must install the certificate.

  2. Configure the producer.cs and consumer.cs files.

    Table 1. Parameters

    Parameter

    Description

    BootstrapServers

    The SSL endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    SslCaLocation

    The path of the SSL root certificate that you downloaded. This parameter is required only if you use the SSL endpoint to connect to the ApsaraMQ for Kafka instance.

    SaslMechanism

    The security mechanism that you want to use to send and receive messages.

    • If you use the SSL endpoint to connect to the ApsaraMQ for Kafka instance, set this parameter to SaslMechanism.Plain.

    • If you use the Simple Authentication and Security Layer (SASL) endpoint to connect to the ApsaraMQ for Kafka instance, set this parameter to SaslMechanism.Plain to specify the PLAIN mechanism or set this parameter to SaslMechanism.ScramSha256 to specify the Salted Challenge Response Authentication Mechanism (SCRAM).

    SecurityProtocol

    The security protocol that you want to use to send and receive messages.

    • If you use the SSL endpoint to connect to the ApsaraMQ for Kafka instance, set this parameter to SecurityProtocol.SaslSsl.

    • If you use the SASL endpoint to connect to the ApsaraMQ for Kafka instance, set this parameter to SecurityProtocol.SaslPlaintext if the PLAIN mechanism is used, or set this parameter to SecurityProtocol.SaslPlaintext if the SCRAM mechanism is used.

    SaslUsername

    The username of the SASL user. If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, this parameter is not available.

    Note
    • If the ACL feature is not enabled for the ApsaraMQ for Kafka instance, you can obtain the username and password of the SASL user from the Username and Password parameters in the Configuration Information section of the Instance Details page in the ApsaraMQ for Kafka console.

    • If the ACL feature is enabled for the ApsaraMQ for Kafka instance, make sure that the SASL user is authorized to send and receive messages by using the instance. For more information, see Grant permissions to SASL users.

    SaslPassword

    The password of the SASL user. If you use the default endpoint to connect to the ApsaraMQ for Kafka instance, this parameter is not available.

    topic

    The topic name. You can obtain the topic name on the Topics page in the ApsaraMQ for Kafka console.

    GroupId

    The group ID. You can obtain the group ID on the Groups page in the ApsaraMQ for Kafka console.

Send messages

Run the following command to run producer.cs to send messages:

dotnet run producer.cs

The following sample code provides an example of producer.cs.

For information about the parameters in the sample code, see Parameters.

Important

In the sample code, the SSL endpoint is used. Delete or modify the code related to the parameters based on the endpoint that you use to connect to the ApsaraMQ for Kafka instance.

using System;
using Confluent.Kafka;

class Producer
{
    public static void Main(string[] args)
    {
        var conf = new ProducerConfig {
            BootstrapServers = "XXX,XXX,XXX",
            SslCaLocation = "XXX/only-4096-ca-cert.pem",
            SaslMechanism = SaslMechanism.Plain,
            SecurityProtocol = SecurityProtocol.SaslSsl,
            SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
            SaslUsername = "XXX",
            SaslPassword = "XXX",
            };

        Action<DeliveryReport<Null, string>> handler = r =>
            Console.WriteLine(!r.Error.IsError
                ? $"Delivered message to {r.TopicPartitionOffset}"
                : $"Delivery Error: {r.Error.Reason}");

        string topic ="XXX";

        using (var p = new ProducerBuilder<Null, string>(conf).Build())
        {
            for (int i=0; i<100; ++i)
            {
                p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler);
            }
            p.Flush(TimeSpan.FromSeconds(10));
        }
    }
}

Receive messages

Run the following command to run consumer.cs to receive messages:

dotnet run consumer.cs

The following sample code provides an example of consumer.cs.

For information about the parameters in the sample code, see Parameters.

Important

In the sample code, the SSL endpoint is used. Delete or modify the code related to the parameters based on the endpoint that you use to connect to the ApsaraMQ for Kafka instance.

using System;
using System.Threading;
using Confluent.Kafka;

class Consumer
{
    public static void Main(string[] args)
    {
        var conf = new ConsumerConfig {
            GroupId = "XXX",
            BootstrapServers = "XXX,XXX,XXX",
            SslCaLocation = "XXX/only-4096-ca-cert.pem",
            SaslMechanism = SaslMechanism.Plain,
            SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
            SecurityProtocol = SecurityProtocol.SaslSsl,
            SaslUsername = "XXX",
            SaslPassword = "XXX",
            AutoOffsetReset = AutoOffsetReset.Earliest
        };

        string topic = "XXX";

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe(topic);

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true;
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                c.Close();
            }
        }
    }
}