このトピックでは、.NET 用 SDK を使用して Advanced Message Queuing Protocol(AMQP)クライアントを Alibaba Cloud IoT Platform に接続し、サーバー側サブスクリプション機能を使用して IoT Platform からメッセージを受信する方法について説明します。
前提条件
トピックのメッセージをサブスクライブするコンシューマーグループの ID を取得します。
DEFAULT_GROUP という名前のデフォルトのコンシューマーグループを使用するか、IoT Platform コンソールでコンシューマーグループを作成できます。詳細については、「コンシューマーグループを管理する」をご参照ください。
コンシューマーグループを使用して、トピックのメッセージをサブスクライブできます。詳細については、「AMQP サーバー側サブスクリプションを設定する」をご参照ください。
開発環境
次の表に、開発環境の要件を示します。
フレームワーク | サポートされているバージョン |
.NET Framework | 3.5、4.0、4.5 以降 |
.NET Micro Framework | 4.2 以降 |
.NET nanoFramework | 1.0 以降 |
.NET Compact Framework | 3.9 以降 |
Windows 10 および Ubuntu 14.04 上の .NET Core | 1.0 以降 |
Mono | 4.2.1 以降 |
SDK をダウンロードする
AMQP.Net Lite ライブラリを使用することをお勧めします。ライブラリをダウンロードして手順を確認するには、AMQP.Net Lite にアクセスしてください。
依存関係を追加する
次の依存関係を packages.config ファイルに追加します。
<packages>
<package id="AMQPNetLite" version="2.2.0" targetFramework="net47" />
</packages>サンプルコード
using System;
using System.Text;
using Amqp;
using Amqp.Sasl;
using Amqp.Framing;
using System.Threading;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using System.Security.Cryptography;
namespace amqp
{
class MainClass
{
// エンドポイント。詳細については、「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
static string Host = "${YourHost}";
static int Port = 5671;
// AccessKey ペアをプロジェクトコードにハードコードすると、プロジェクトコードが漏洩した場合に AccessKey ペアが公開される可能性があります。この場合、アカウント内のリソースは安全ではなくなります。次のサンプルコードは、環境変数から AccessKey ペアを取得する方法の例を示しています。この例は参照用です。
static string AccessKey = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID");
static string AccessSecret = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
static string consumerGroupId = "${YourConsumerGroupId}";
static string clientId = "${YourClientId}";
// iotInstanceId: IoT Platform インスタンスの ID。
static string iotInstanceId = "${YourIotInstanceId}";
static int Count = 0;
static int IntervalTime = 10000;
static Address address;
public static void Main(string[] args)
{
long timestamp = GetCurrentMilliseconds();
string param = "authId=" + AccessKey + "×tamp=" + timestamp;
// userName パラメータの構造。詳細については、「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
string userName = clientId + "|authMode=aksign,signMethod=hmacmd5,consumerGroupId=" + consumerGroupId
+ ",iotInstanceId=" + iotInstanceId + ",authId=" + AccessKey + ",timestamp=" + timestamp + "|";
// 署名を計算します。パスワードの構成方法の詳細については、「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
string password = doSign(param, AccessSecret, "HmacMD5");
DoConnectAmqp(userName, password);
ManualResetEvent resetEvent = new ManualResetEvent(false);
resetEvent.WaitOne();
}
static void DoConnectAmqp(string userName, string password)
{
address = new Address(Host, Port, userName, password);
// 接続を作成します。
ConnectionFactory cf = new ConnectionFactory();
// 必要に応じて、ローカルの TLS 証明書を使用します。
//cf.SSL.ClientCertificates.Add(GetCert());
//cf.SSL.RemoteCertificateValidationCallback = ValidateServerCertificate;
cf.SASL.Profile = SaslProfile.External;
cf.AMQP.IdleTimeout = 120000;
// cf.AMQP.ContainerId パラメータと cf.AMQP.HostName パラメータを設定します。
cf.AMQP.ContainerId = "client.1.2";
cf.AMQP.HostName = "contoso.com";
cf.AMQP.MaxFrameSize = 8 * 1024;
var connection = cf.CreateAsync(address).Result;
// 接続例外機能は無効になっています。
connection.AddClosedCallback(ConnClosed);
// メッセージを受信します。
DoReceive(connection);
}
static void DoReceive(Connection connection)
{
// セッションを作成します。
var session = new Session(connection);
// レシーバーリンクを作成し、メッセージを受信します。
var receiver = new ReceiverLink(session, "queueName", null);
receiver.Start(20, (link, message) =>
{
object messageId = message.ApplicationProperties["messageId"];
object topic = message.ApplicationProperties["topic"];
string body = Encoding.UTF8.GetString((Byte[])message.Body);
// 注: 時間のかかるロジックを実装しないでください。ビジネスロジックを指定する必要がある場合は、新しいスレッドを使用してください。そうしないと、メッセージの消費がブロックされる可能性があります。消費のレイテンシが原因で、メッセージが再送信される場合があります。
Console.WriteLine("receive message, topic=" + topic + ", messageId=" + messageId + ", body=" + body);
// ACK メッセージを送信します。
link.Accept(message);
});
}
// 例外が発生した場合、クライアントは IoT Platform への再接続を試みます。
// 指数バックオフアルゴリズムを使用して、例外処理メカニズムと再接続ポリシーを改善できます。
static void ConnClosed(IAmqpObject _, Error e)
{
Console.WriteLine("ocurr error: " + e);
if(Count < 3)
{
Count += 1;
Thread.Sleep(IntervalTime * Count);
}
else
{
Thread.Sleep(120000);
}
// 接続を再確立します。
DoConnectAmqp(address.User, address.Password);
}
static X509Certificate GetCert()
{
string certPath = Environment.CurrentDirectory + "/root.crt";
X509Certificate crt = new X509Certificate(certPath);
return crt;
}
static bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
return true;
}
static long GetCurrentMilliseconds()
{
DateTime dt1970 = new DateTime(1970, 1, 1);
DateTime current = DateTime.Now;
return (long)(current - dt1970).TotalMilliseconds;
}
// 署名アルゴリズム。有効な値: hmacmd5、hmacsha1、hmacsha256。
static string doSign(string param, string accessSecret, string signMethod)
{
//signMethod = HmacMD5
byte[] key = Encoding.UTF8.GetBytes(accessSecret);
byte[] signContent = Encoding.UTF8.GetBytes(param);
var hmac = new HMACMD5(key);
byte[] hashBytes = hmac.ComputeHash(signContent);
return Convert.ToBase64String(hashBytes);
}
}
}次の表のパラメータの説明に基づいて、上記のコードのパラメータを設定できます。その他のパラメータの詳細については、「AMQP クライアントを IoT Platform に接続する」をご参照ください。
有効なパラメータ値を指定していることを確認してください。そうしないと、AMQP クライアントは IoT Platform に接続できません。
パラメータ | 説明 |
Host | AMQP クライアントが IoT Platform に接続するために使用するエンドポイント。
|
AccessKey | IoT Platform コンソールにログインし、右上隅のプロファイル画像にポインタを移動して、[accesskey 管理] をクリックして、AccessKey ID と AccessKey シークレットを取得します。 説明 Resource Access Management(RAM)ユーザーを使用する場合は、AliyunIOTFullAccess ポリシーを RAM ユーザーにアタッチする必要があります。このポリシーにより、RAM ユーザーは IoT Platform リソースを管理できます。そうしないと、IoT Platform への接続は失敗します。詳細については、「RAM ユーザーとして IoT Platform にアクセスする」をご参照ください。 |
AccessSecret | |
consumerGroupId | IoT Platform インスタンスのコンシューマーグループの ID。 コンシューマーグループの ID を表示するには、次の手順を実行します。IoT Platform コンソールにログインし、管理するインスタンスのカードをクリックします。左側のナビゲーションウィンドウで、 を選択します。コンシューマーグループの ID は、[コンシューマーグループ] タブに表示されます。 |
iotInstanceId | IoT Platform インスタンスの ID。IoT Platform コンソール の [概要] タブでインスタンス ID を表示できます。IoT Platform コンソール
|
clientId | クライアントの ID。カスタム ID を指定する必要があります。ID は 1 ~ 64 文字の長さである必要があります。クライアントを実行しているサーバーの UUID、MAC アドレス、IP アドレスなど、一意の識別子をクライアント ID として使用することをお勧めします。 AMQP クライアントが IoT Platform に接続して起動した後、次の手順を実行してクライアントの詳細を表示します。IoT Platform コンソールにログインし、管理する インスタンス のカードをクリックします。左側のナビゲーションウィンドウで、 を選択します。[コンシューマーグループ] タブで、管理するコンシューマーグループを見つけ、[アクション] 列の [表示] をクリックします。各クライアントの ID は、[コンシューマーグループステータス] タブに表示されます。クライアント ID を使用すると、クライアントを簡単に識別できます。 |
サンプル結果
次のような出力が表示された場合、AMQP クライアントは IoT Platform に接続されており、メッセージを受信できます。
パラメータ
例
説明
topic
/***********/******/thing/event/property/post
デバイスプロパティを送信するために使用されるトピック。
messageId
2**************7
メッセージの ID。
body
{"deviceType":"CustomCategory","iotId":"4EwuVV***","requestId":"161268***","checkFailedData":{},"productKey":"g4***S","gmtCreate":1612682173249,"deviceName":"Esensor","items":{"temperature":{"value":-1,"time":1612682173247},"humidity":{"value":74,"time":1612682173247}}}
メッセージの内容。
次のような出力が表示された場合、AMQP クライアントは IoT Platform に接続できません。
関連情報
サーバー側サブスクリプション機能に関連するエラーコードの詳細については、「IoT プラットフォームログ」トピックの メッセージに関連するエラーコード セクションをご参照ください。