このトピックでは、Go 用 SDK を使用して Advanced Message Queuing Protocol(AMQP)クライアントを Alibaba Cloud IoT Platform に接続し、サーバー側サブスクリプション機能を使用して IoT Platform からメッセージを受信する方法について説明します。
前提条件
トピックのメッセージをサブスクライブするコンシューマーグループの ID を取得します。
DEFAULT_GROUP という名前のデフォルトのコンシューマーグループを使用するか、IoT Platform コンソールでコンシューマーグループを作成できます。詳細については、「コンシューマーグループを管理する」をご参照ください。
コンシューマーグループを使用して、トピックのメッセージをサブスクライブできます。詳細については、「AMQP サーバー側サブスクリプションを設定する」をご参照ください。
開発環境
この例では、Go 1.12.7 を使用しています。
SDK のダウンロード
次のコマンドを実行して、Go 用 AMQP SDK をインポートできます。
import "pack.ag/amqp"
SDK の使用方法の詳細については、「package amqp」をご参照ください。
サンプルコード
package main
import (
"os"
"context"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"fmt"
"pack.ag/amqp"
"time"
)
// パラメータの詳細については、「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
const consumerGroupId = "${YourConsumerGroupId}"
const clientId = "${YourClientId}"
// iotInstanceId: IoT Platform インスタンスの ID。
const iotInstanceId = "${YourIotInstanceId}"
// エンドポイント。詳細については、「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
const host = "${YourHost}"
func main() {
// AccessKey ペアをプロジェクトコードにハードコードすると、プロジェクトコードが漏洩した場合に AccessKey ペアが公開される可能性があります。この場合、アカウント内のリソースは安全ではなくなります。次のサンプルコードは、環境変数から AccessKey ペアを取得する方法の例を示しています。この例は参照用です。
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
accessSecret := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
address := "amqps://" + host + ":5671"
timestamp := time.Now().Nanosecond() / 1000000
// userName パラメータの構造。詳細については、「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
userName := fmt.Sprintf("%s|authMode=aksign,signMethod=Hmacsha1,consumerGroupId=%s,authId=%s,iotInstanceId=%s,timestamp=%d|",
clientId, consumerGroupId, accessKey, iotInstanceId, timestamp)
stringToSign := fmt.Sprintf("authId=%s×tamp=%d", accessKey, timestamp)
hmacKey := hmac.New(sha1.New, []byte(accessSecret))
hmacKey.Write([]byte(stringToSign))
// 署名を計算します。パスワードの構成方法の詳細については、「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
password := base64.StdEncoding.EncodeToString(hmacKey.Sum(nil))
amqpManager := &AmqpManager{
address:address,
userName:userName,
password:password,
}
// メッセージ受信機能を有効または無効にする必要がある場合は、Background() 関数を使用してコンテキストを作成できます。
ctx := context.Background()
amqpManager.startReceiveMessage(ctx)
}
// ビジネスロジックを実装する関数。この関数は、非同期的に呼び出されるユーザー定義関数です。この関数を設定する前に、システムリソースの消費量を検討することをお勧めします。
func (am *AmqpManager) processMessage(message *amqp.Message) {
fmt.Println("受信データ:", string(message.GetData()), " プロパティ:", message.ApplicationProperties)
}
type AmqpManager struct {
address string
userName string
password string
client *amqp.Client
session *amqp.Session
receiver *amqp.Receiver
}
func (am *AmqpManager) startReceiveMessage(ctx context.Context) {
childCtx, _ := context.WithCancel(ctx)
err := am.generateReceiverWithRetry(childCtx)
if nil != err {
return
}
defer func() {
am.receiver.Close(childCtx)
am.session.Close(childCtx)
am.client.Close()
}()
for {
// メッセージ受信をブロックします。ctx が Background() 関数に基づいて作成された新しいコンテキストの場合、メッセージ受信はブロックされません。
message, err := am.receiver.Receive(ctx)
if nil == err {
go am.processMessage(message)
message.Accept()
} else {
fmt.Println("amqp 受信データエラー:", err)
// メッセージ受信が手動で無効になっている場合は、プログラムを終了します。
select {
case <- childCtx.Done(): return
default:
}
// メッセージ受信が手動で無効になっていない場合は、接続を再確立します。
err := am.generateReceiverWithRetry(childCtx)
if nil != err {
return
}
}
}
}
func (am *AmqpManager) generateReceiverWithRetry(ctx context.Context) error {
// 指数バックオフでリトライします。10 ミリ秒から 20 秒まで。
duration := 10 * time.Millisecond
maxDuration := 20000 * time.Millisecond
times := 1
// 例外が発生した場合は、指数バックオフでリトライします。
for {
select {
case <- ctx.Done(): return amqp.ErrConnClosed
default:
}
err := am.generateReceiver()
if nil != err {
time.Sleep(duration)
if duration < maxDuration {
duration *= 2
}
fmt.Println("amqp 接続リトライ、回数:", times, ",期間:", duration)
times ++
} else {
fmt.Println("amqp 接続初期化成功")
return nil
}
}
}
// パケットが使用できないため、接続とセッションの状態を判断できません。情報を取得するには、接続を再確立します。
func (am *AmqpManager) generateReceiver() error {
if am.session != nil {
receiver, err := am.session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(20),
)
// ネットワーク切断エラーが発生した場合、接続は閉じられ、セッションの確立に失敗します。それ以外の場合、接続は確立されます。
if err == nil {
am.receiver = receiver
return nil
}
}
// 以前の接続を削除します。
if am.client != nil {
am.client.Close()
}
client, err := amqp.Dial(am.address, amqp.ConnSASLPlain(am.userName, am.password), )
if err != nil {
return err
}
am.client = client
session, err := client.NewSession()
if err != nil {
return err
}
am.session = session
receiver, err := am.session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(20),
)
if err != nil {
return err
}
am.receiver = receiver
return nil
}
次の表のパラメータの説明に基づいて、上記のコードのパラメータを設定できます。その他のパラメータの詳細については、「AMQP クライアントを IoT Platform に接続する」をご参照ください。
有効なパラメータ値を指定していることを確認してください。そうしないと、AMQP クライアントは IoT Platform に接続できません。
パラメータ | 説明 |
accessKey | IoT Platform コンソールにログインし、右上隅のプロファイル画像にポインタを移動して、[accesskey 管理] をクリックして、AccessKey ID と AccessKey シークレットを取得します。 説明 Resource Access Management(RAM)ユーザーを使用する場合は、AliyunIOTFullAccess ポリシーを RAM ユーザーにアタッチする必要があります。このポリシーにより、RAM ユーザーは IoT Platform リソースを管理できます。そうしないと、IoT Platform への接続に失敗します。詳細については、「RAM ユーザーとして IoT Platform にアクセスする」をご参照ください。 |
accessSecret | |
consumerGroupId | IoT Platform インスタンスのコンシューマーグループの ID。 コンシューマーグループの ID を表示するには、次の手順を実行します。IoT Platform コンソールにログインし、管理するインスタンスのカードをクリックします。左側のナビゲーションウィンドウで、 を選択します。コンシューマーグループの ID は、[コンシューマーグループ] タブに表示されます。 |
iotInstanceId | IoT Platform インスタンスの ID です。[概要] タブの IoT Platform コンソール でインスタンス ID を表示できます。
|
clientId | クライアントの ID。カスタム ID を指定する必要があります。ID は 1 ~ 64 文字の長さでなければなりません。クライアントが実行されているサーバーの UUID、MAC アドレス、IP アドレスなど、一意の識別子をクライアント ID として使用することをお勧めします。 AMQP クライアントが IoT Platform に接続して起動した後、次の手順を実行してクライアントの詳細を表示します。IoT Platform コンソールにログインし、管理する インスタンス のカードをクリックします。左側のナビゲーションウィンドウで、 を選択します。[コンシューマーグループ] タブで、管理するコンシューマーグループを見つけ、[アクション] 列の [表示] をクリックします。各クライアントの ID は、[コンシューマーグループステータス] タブに表示されます。クライアント ID を使用すると、クライアントを簡単に識別できます。 |
host | AMQP クライアントが IoT Platform に接続するために使用するエンドポイント。
|
サンプル結果
次のような出力が表示された場合、AMQP クライアントは IoT Platform に接続されており、メッセージを受信できます。
次のような出力が表示された場合、AMQP クライアントは IoT Platform に接続できません。
ログに基づいてコードまたはネットワーク環境を確認し、問題を解決してから、コードを再度実行できます。
関連情報
サーバー側サブスクリプション機能に関連するエラーコードの詳細については、「IoT プラットフォームログ」トピックの「メッセージに関連するエラーコード」セクションをご参照ください。