このトピックでは、C# 用 SDK を使用して ApsaraMQ for Kafka に接続し、メッセージを送受信する方法について説明します。
環境要件
.NET がインストールされていること。 詳細については、「Download .NET」をご参照ください。
C# ライブラリのインストール
C# ライブラリをインストールするには、次のコマンドを実行します。
dotnet add package -v 1.5.2 Confluent.Kafka
構成ファイルの作成
(オプション) Secure Sockets Layer (SSL) ルート証明書をダウンロードします。 SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、証明書をインストールする必要があります。
producer.cs ファイルと consumer.cs ファイルを構成します。
表 1. パラメーター
パラメーター
説明
BootstrapServers
ApsaraMQ for Kafka インスタンスの SSL エンドポイント。 エンドポイントは、アクセスポイント情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションで取得できます。
SslCaLocation
ダウンロードした SSL ルート証明書のパス。 このパラメーターは、SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合にのみ必須です。
SaslMechanism
メッセージの送受信に使用するセキュリティメカニズム。
SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターを SaslMechanism.Plain に設定します。
Simple Authentication and Security Layer (SASL) エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターを SaslMechanism.Plain に設定して PLAIN メカニズムを指定するか、SaslMechanism.ScramSha256 に設定して Salted Challenge Response Authentication Mechanism (SCRAM) を指定します。
SecurityProtocol
メッセージの送受信に使用するセキュリティプロトコル。
SSL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターを SecurityProtocol.SaslSsl に設定します。
SASL エンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、PLAIN メカニズムを使用する場合はこのパラメーターを SecurityProtocol.SaslPlaintext に設定し、SCRAM メカニズムを使用する場合は SecurityProtocol.SaslPlaintext に設定します。
SaslUsername
SASL ユーザーのユーザー名。 デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターは使用できません。
説明ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっていない場合は、ユーザー名パスワード設定情報インスタンスの詳細ApsaraMQ for Kafka コンソール の ページの セクションにある パラメーターと パラメーターから、SASL ユーザーのユーザー名とパスワードを取得できます。
ApsaraMQ for Kafka インスタンスで ACL 機能が有効になっている場合は、インスタンスを使用してメッセージを送受信する権限が SASL ユーザーに付与されていることを確認してください。 詳細については、「SASL ユーザーへの権限の付与」をご参照ください。
SaslPassword
SASL ユーザーのパスワード。 デフォルトエンドポイントを使用して ApsaraMQ for Kafka インスタンスに接続する場合は、このパラメーターは使用できません。
topic
トピック名。 トピック管理 ページ (ApsaraMQ for Kafka コンソール) でトピック名を取得できます。
GroupId
グループ ID。 Group の管理 ページ (ApsaraMQ for Kafka コンソール) でグループ ID を取得できます。
メッセージの送信
producer.cs を実行してメッセージを送信するには、次のコマンドを実行します。
dotnet run producer.cs
次のサンプルコードは、producer.cs の例を示しています。
サンプルコードのパラメーターについては、「パラメーター」をご参照ください。
サンプルコードでは、SSL エンドポイントが使用されています。 ApsaraMQ for Kafka インスタンスへの接続に使用するエンドポイントに基づいて、パラメーターに関連するコードを削除または変更してください。
using System;
using Confluent.Kafka;
class Producer
{
public static void Main(string[] args)
{
var conf = new ProducerConfig {
BootstrapServers = "XXX,XXX,XXX", // ブートストラップサーバー
SslCaLocation = "XXX/only-4096-ca-cert.pem", // SSL 証明書のパス
SaslMechanism = SaslMechanism.Plain, // SASL メカニズム
SecurityProtocol = SecurityProtocol.SaslSsl, // セキュリティプロトコル
SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
SaslUsername = "XXX", // SASL ユーザー名
SaslPassword = "XXX", // SASL パスワード
};
Action<DeliveryReport<Null, string>> handler = r =>
Console.WriteLine(!r.Error.IsError
? $"Delivered message to {r.TopicPartitionOffset}" // メッセージが配信されました
: $"Delivery Error: {r.Error.Reason}"); // 配信エラー
string topic ="XXX"; // トピック名
using (var p = new ProducerBuilder<Null, string>(conf).Build())
{
for (int i=0; i<100; ++i)
{
p.Produce(topic, new Message<Null, string> { Value = i.ToString() }, handler);
}
p.Flush(TimeSpan.FromSeconds(10));
}
}
}
メッセージの受信
consumer.cs を実行してメッセージを受信するには、次のコマンドを実行します。
dotnet run consumer.cs
次のサンプルコードは、consumer.cs の例を示しています。
サンプルコードのパラメーターについては、「パラメーター」をご参照ください。
サンプルコードでは、SSL エンドポイントが使用されています。 ApsaraMQ for Kafka インスタンスへの接続に使用するエンドポイントに基づいて、パラメーターに関連するコードを削除または変更してください。
using System;
using System.Threading;
using Confluent.Kafka;
class Consumer
{
public static void Main(string[] args)
{
var conf = new ConsumerConfig {
GroupId = "XXX", // グループ ID
BootstrapServers = "XXX,XXX,XXX", // ブートストラップサーバー
SslCaLocation = "XXX/only-4096-ca-cert.pem", // SSL 証明書のパス
SaslMechanism = SaslMechanism.Plain, // SASL メカニズム
SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
SecurityProtocol = SecurityProtocol.SaslSsl, // セキュリティプロトコル
SaslUsername = "XXX", // SASL ユーザー名
SaslPassword = "XXX", // SASL パスワード
AutoOffsetReset = AutoOffsetReset.Earliest
};
string topic = "XXX"; // トピック名
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); // メッセージが消費されました
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}"); // エラーが発生しました
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
}
}