すべてのプロダクト
Search
ドキュメントセンター

IoT Platform:Go 用 SDK を使用してクライアントを IoT Platform に接続する

最終更新日:Mar 22, 2025

このトピックでは、Go 用 SDK を使用して Advanced Message Queuing Protocol(AMQP)クライアントを Alibaba Cloud IoT Platform に接続し、サーバー側サブスクリプション機能を使用して IoT Platform からメッセージを受信する方法について説明します。

前提条件

トピックのメッセージをサブスクライブするコンシューマーグループの ID を取得します。

開発環境

この例では、Go 1.12.7 を使用しています。

SDK のダウンロード

次のコマンドを実行して、Go 用 AMQP SDK をインポートできます。

import "pack.ag/amqp"

SDK の使用方法の詳細については、「package amqp」をご参照ください。

サンプルコード

package main

import (
	"os"
    "context"
    "crypto/hmac"
    "crypto/sha1"
    "encoding/base64"
    "fmt"
    "pack.ag/amqp"
    "time"
)
// パラメータの詳細については、「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
const consumerGroupId = "${YourConsumerGroupId}"
const clientId = "${YourClientId}"
// iotInstanceId: IoT Platform インスタンスの ID。
const iotInstanceId = "${YourIotInstanceId}"
// エンドポイント。詳細については、「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
const host = "${YourHost}"

func main() {
	// AccessKey ペアをプロジェクトコードにハードコードすると、プロジェクトコードが漏洩した場合に AccessKey ペアが公開される可能性があります。この場合、アカウント内のリソースは安全ではなくなります。次のサンプルコードは、環境変数から AccessKey ペアを取得する方法の例を示しています。この例は参照用です。
	accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
	accessSecret := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    address := "amqps://" + host + ":5671"
    timestamp := time.Now().Nanosecond() / 1000000
    // userName パラメータの構造。詳細については、「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
    userName := fmt.Sprintf("%s|authMode=aksign,signMethod=Hmacsha1,consumerGroupId=%s,authId=%s,iotInstanceId=%s,timestamp=%d|", 
        clientId, consumerGroupId, accessKey, iotInstanceId, timestamp)
    stringToSign := fmt.Sprintf("authId=%s&timestamp=%d", accessKey, timestamp)
    hmacKey := hmac.New(sha1.New, []byte(accessSecret))
    hmacKey.Write([]byte(stringToSign))
    // 署名を計算します。パスワードの構成方法の詳細については、「AMQP クライアントを IoT Platform に接続する」トピックを参照してください。
    password := base64.StdEncoding.EncodeToString(hmacKey.Sum(nil))

    amqpManager := &AmqpManager{
        address:address,
        userName:userName,
        password:password,
    }

    // メッセージ受信機能を有効または無効にする必要がある場合は、Background() 関数を使用してコンテキストを作成できます。
    ctx := context.Background()

    amqpManager.startReceiveMessage(ctx)
}

// ビジネスロジックを実装する関数。この関数は、非同期的に呼び出されるユーザー定義関数です。この関数を設定する前に、システムリソースの消費量を検討することをお勧めします。
func (am *AmqpManager) processMessage(message *amqp.Message) {
    fmt.Println("受信データ:", string(message.GetData()), " プロパティ:", message.ApplicationProperties)
}

type AmqpManager struct {
    address     string
    userName     string
    password     string
    client       *amqp.Client
    session     *amqp.Session
    receiver     *amqp.Receiver
}

func (am *AmqpManager) startReceiveMessage(ctx context.Context)  {

    childCtx, _ := context.WithCancel(ctx)
    err := am.generateReceiverWithRetry(childCtx)
    if nil != err {
        return
    }
    defer func() {
        am.receiver.Close(childCtx)
        am.session.Close(childCtx)
        am.client.Close()
    }()

    for {
        // メッセージ受信をブロックします。ctx が Background() 関数に基づいて作成された新しいコンテキストの場合、メッセージ受信はブロックされません。
        message, err := am.receiver.Receive(ctx)

        if nil == err {
            go am.processMessage(message)
            message.Accept()
        } else {
            fmt.Println("amqp 受信データエラー:", err)

            // メッセージ受信が手動で無効になっている場合は、プログラムを終了します。
            select {
            case <- childCtx.Done(): return
            default:
            }

            // メッセージ受信が手動で無効になっていない場合は、接続を再確立します。
            err := am.generateReceiverWithRetry(childCtx)
            if nil != err {
                return
            }
        }
    }
}

func (am *AmqpManager) generateReceiverWithRetry(ctx context.Context) error {
    // 指数バックオフでリトライします。10 ミリ秒から 20 秒まで。
    duration := 10 * time.Millisecond
    maxDuration := 20000 * time.Millisecond
    times := 1

    // 例外が発生した場合は、指数バックオフでリトライします。
    for {
        select {
        case <- ctx.Done(): return amqp.ErrConnClosed
        default:
        }

        err := am.generateReceiver()
        if nil != err {
            time.Sleep(duration)
            if duration < maxDuration {
                duration *= 2
            }
            fmt.Println("amqp 接続リトライ、回数:", times, ",期間:", duration)
            times ++
        } else {
            fmt.Println("amqp 接続初期化成功")
            return nil
        }
    }
}

// パケットが使用できないため、接続とセッションの状態を判断できません。情報を取得するには、接続を再確立します。
func (am *AmqpManager) generateReceiver() error {

    if am.session != nil {
        receiver, err := am.session.NewReceiver(
            amqp.LinkSourceAddress("/queue-name"),
            amqp.LinkCredit(20),
        )
        // ネットワーク切断エラーが発生した場合、接続は閉じられ、セッションの確立に失敗します。それ以外の場合、接続は確立されます。
        if err == nil {
            am.receiver = receiver
            return nil
        }
    }

    // 以前の接続を削除します。
    if am.client != nil {
        am.client.Close()
    }

    client, err := amqp.Dial(am.address, amqp.ConnSASLPlain(am.userName, am.password), )
    if err != nil {
        return err
    }
    am.client = client

    session, err := client.NewSession()
    if err != nil {
        return err
    }
    am.session = session

    receiver, err := am.session.NewReceiver(
        amqp.LinkSourceAddress("/queue-name"),
        amqp.LinkCredit(20),
    )
    if err != nil {
        return err
    }
    am.receiver = receiver

    return nil
}

次の表のパラメータの説明に基づいて、上記のコードのパラメータを設定できます。その他のパラメータの詳細については、「AMQP クライアントを IoT Platform に接続する」をご参照ください。

重要

有効なパラメータ値を指定していることを確認してください。そうしないと、AMQP クライアントは IoT Platform に接続できません。

パラメータ

説明

accessKey

IoT Platform コンソールにログインし、右上隅のプロファイル画像にポインタを移動して、[accesskey 管理] をクリックして、AccessKey ID と AccessKey シークレットを取得します。

説明

Resource Access Management(RAM)ユーザーを使用する場合は、AliyunIOTFullAccess ポリシーを RAM ユーザーにアタッチする必要があります。このポリシーにより、RAM ユーザーは IoT Platform リソースを管理できます。そうしないと、IoT Platform への接続に失敗します。詳細については、「RAM ユーザーとして IoT Platform にアクセスする」をご参照ください。

accessSecret

consumerGroupId

IoT Platform インスタンスのコンシューマーグループの ID。

コンシューマーグループの ID を表示するには、次の手順を実行します。IoT Platform コンソールにログインし、管理するインスタンスのカードをクリックします。左側のナビゲーションウィンドウで、[メッセージ転送] > [サーバー側サブスクリプション] を選択します。コンシューマーグループの ID は、[コンシューマーグループ] タブに表示されます。

iotInstanceId

IoT Platform インスタンスの ID です。[概要] タブの IoT Platform コンソール でインスタンス ID を表示できます。

  • インスタンス ID が表示されている場合は、このパラメータをインスタンス ID に設定する必要があります。

  • [概要] タブが表示されていない場合、または インスタンスに ID がない場合は、iotInstanceId = "" の形式でこのパラメータを空のままにします。

clientId

クライアントの ID。カスタム ID を指定する必要があります。ID は 1 ~ 64 文字の長さでなければなりません。クライアントが実行されているサーバーの UUID、MAC アドレス、IP アドレスなど、一意の識別子をクライアント ID として使用することをお勧めします。

AMQP クライアントが IoT Platform に接続して起動した後、次の手順を実行してクライアントの詳細を表示します。IoT Platform コンソールにログインし、管理する インスタンス のカードをクリックします。左側のナビゲーションウィンドウで、[メッセージ転送] > [サーバー側サブスクリプション] を選択します。[コンシューマーグループ] タブで、管理するコンシューマーグループを見つけ、[アクション] 列の [表示] をクリックします。各クライアントの ID は、[コンシューマーグループステータス] タブに表示されます。クライアント ID を使用すると、クライアントを簡単に識別できます。

host

AMQP クライアントが IoT Platform に接続するために使用するエンドポイント。

${YourHost} 変数に指定できるエンドポイントの詳細については、「インスタンスのエンドポイントを管理する」をご参照ください。

サンプル結果

  • 次のような出力が表示された場合、AMQP クライアントは IoT Platform に接続されており、メッセージを受信できます。成功

  • 次のような出力が表示された場合、AMQP クライアントは IoT Platform に接続できません。

    ログに基づいてコードまたはネットワーク環境を確認し、問題を解決してから、コードを再度実行できます。

    失败

関連情報

サーバー側サブスクリプション機能に関連するエラーコードの詳細については、「IoT プラットフォームログ」トピックの「メッセージに関連するエラーコード」セクションをご参照ください。