すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:C# 用 SDK を使用してメッセージを送受信する

最終更新日:Mar 19, 2025

このトピックでは、C# 用 SDK を使用して ApsaraMQ for Kafka に接続し、メッセージを送受信する方法について説明します。

環境要件

.NET がインストールされていること。 詳細については、「Download .NET」をご参照ください。

C# ライブラリのインストール

C# ライブラリをインストールするには、次のコマンドを実行します。

dotnet add package -v 1.5.2 Confluent.Kafka

構成ファイルの作成

  1. (オプション) Secure Sockets Layer (SSL) ルート証明書をダウンロードします。 SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、証明書をインストールする必要があります。

  2. producer.cs ファイルと consumer.cs ファイルを構成します。

    表 1. パラメーター

    パラメーター

    説明

    BootstrapServers

    ApsaraMQ for Kafka インスタンスの SSL エンドポイント。 エンドポイントは、アクセスポイント情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションで取得できます。

    SslCaLocation

    ダウンロードした SSL ルート証明書のパス。 このパラメーターは、SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合にのみ必須です。

    SaslMechanism

    メッセージの送受信に使用するセキュリティメカニズム。

    • SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターを SaslMechanism.Plain に設定します。

    • Simple Authentication and Security Layer (SASL) エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターを SaslMechanism.Plain に設定して PLAIN メカニズムを指定するか、SaslMechanism.ScramSha256 に設定して Salted Challenge Response Authentication Mechanism (SCRAM) を指定します。

    SecurityProtocol

    メッセージの送受信に使用するセキュリティプロトコル。

    • SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターを SecurityProtocol.SaslSsl に設定します。

    • SASL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、PLAIN メカニズムを使用する場合はこのパラメーターを SecurityProtocol.SaslPlaintext に設定し、SCRAM メカニズムを使用する場合は SecurityProtocol.SaslPlaintext に設定します。

    SaslUsername

    SASL ユーザーのユーザー名。 デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターは使用できません。

    説明
    • ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっていない場合は、ユーザー名パスワード設定情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションにある パラメーターと パラメーターから、SASL ユーザーのユーザー名とパスワードを取得できます。

    • ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっている場合は、インスタンスを使用してメッセージを送受信する権限が SASL ユーザーに付与されていることを確認してください。 詳細については、「SASL ユーザーへの権限の付与」をご参照ください。

    SaslPassword

    SASL ユーザーのパスワード。 デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターは使用できません。

    topic

    トピック名。 トピック管理 ページ (ApsaraMQ for Kafka コンソール) でトピック名を取得できます。

    GroupId

    グループ ID。 Group の管理 ページ (ApsaraMQ for Kafka コンソール) でグループ ID を取得できます。

メッセージの送信

producer.cs を実行してメッセージを送信するには、次のコマンドを実行します。

dotnet run producer.cs

次のサンプルコードは、producer.cs の例を示しています。

サンプルコードのパラメーターについては、「パラメーター」をご参照ください。

重要

サンプルコードでは、SSL エンドポイントが使用されています。 ApsaraMQ for Kafka インスタンスへの接続に使用するエンドポイントに基づいて、パラメーターに関連するコードを削除または変更してください。

using System;
using Confluent.Kafka;

class Producer
{
    public static void Main(string[] args)
    {
        var conf = new ProducerConfig {
            BootstrapServers = "XXX,XXX,XXX", // ブートストラップサーバー
            SslCaLocation = "XXX/only-4096-ca-cert.pem", // SSL 証明書のパス
            SaslMechanism = SaslMechanism.Plain, // SASL メカニズム
            SecurityProtocol = SecurityProtocol.SaslSsl, // セキュリティプロトコル
            SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
            SaslUsername = "XXX", // SASL ユーザー名
            SaslPassword = "XXX", // SASL パスワード
            };

        Action<DeliveryReport<Null, string>> handler = r =>
            Console.WriteLine(!r.Error.IsError
                ? $"Delivered message to {r.TopicPartitionOffset}" // メッセージが配信されました
                : $"Delivery Error: {r.Error.Reason}"); // 配信エラー

        string topic ="XXX"; // トピック名

        using (var p = new ProducerBuilder<Null, string>(conf).Build())
        {
            for (int i=0; i<100; ++i)
            {
                p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler);
            }
            p.Flush(TimeSpan.FromSeconds(10));
        }
    }
}

メッセージの受信

consumer.cs を実行してメッセージを受信するには、次のコマンドを実行します。

dotnet run consumer.cs

次のサンプルコードは、consumer.cs の例を示しています。

サンプルコードのパラメーターについては、「パラメーター」をご参照ください。

重要

サンプルコードでは、SSL エンドポイントが使用されています。 ApsaraMQ for Kafka インスタンスへの接続に使用するエンドポイントに基づいて、パラメーターに関連するコードを削除または変更してください。

using System;
using System.Threading;
using Confluent.Kafka;

class Consumer
{
    public static void Main(string[] args)
    {
        var conf = new ConsumerConfig {
            GroupId = "XXX", // グループ ID
            BootstrapServers = "XXX,XXX,XXX", // ブートストラップサーバー
            SslCaLocation = "XXX/only-4096-ca-cert.pem", // SSL 証明書のパス
            SaslMechanism = SaslMechanism.Plain, // SASL メカニズム
            SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
            SecurityProtocol = SecurityProtocol.SaslSsl, // セキュリティプロトコル
            SaslUsername = "XXX", // SASL ユーザー名
            SaslPassword = "XXX", // SASL パスワード
            AutoOffsetReset = AutoOffsetReset.Earliest
        };

        string topic = "XXX"; // トピック名

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe(topic);

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true;
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); // メッセージが消費されました
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}"); // エラーが発生しました
                    }
                }
            }
            catch (OperationCanceledException)
            {
                c.Close();
            }
        }
    }
}