すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for MQTT:ApsaraMQ for RocketMQ から ApsaraMQ for MQTT へのデータのインポート

最終更新日:Jan 14, 2025

順序メッセージやトランザクションメッセージなど、ApsaraMQ for RocketMQ の特定の機能をクラウドアプリケーションで使用する場合、データの受信または送信ルールを使用して ApsaraMQ for MQTTApsaraMQ for RocketMQ 間のデータを交換できます。このトピックでは、ApsaraMQ for RocketMQ から ApsaraMQ for MQTT にデータをインポートする方法について説明します。

背景情報

ApsaraMQ for MQTT はクラウド SDK をサポートしています。クラウド SDK を使用して、クラウドアプリケーションを ApsaraMQ for MQTT ブローカーに接続し、メッセージを送受信できます。クラウド SDK の使用方法については、概要 をご参照ください。

ApsaraMQ for MQTT は、ApsaraMQ for MQTT と他の Alibaba Cloud サービス間のデータ交換もサポートしています。現在、ApsaraMQ for MQTT と ApsaraMQ for RocketMQ 間でのみデータを交換できます。

このトピックでは、SDK for Java を使用して、インターネット経由で ApsaraMQ for RocketMQ から ApsaraMQ for MQTT にデータをインポートする方法について説明します。

quick_start_data_inflow

ネットワークアクセス

ApsaraMQ for MQTT は、インターネットアクセスポイントVPC アクセスポイントを提供します。

  • インターネットアクセスポイントは、インターネット経由で ApsaraMQ for MQTT にアクセスするために使用される IP アドレスです。ほとんどの場合、パブリックエンドポイントは IoT およびモバイルインターネットのシナリオで使用されます。

  • VPC アクセスポイントは、プライベート仮想クラウド (VPC) 内で ApsaraMQ for MQTT にアクセスするために使用される IP アドレスです。ほとんどの場合、VPC エンドポイントは、クラウドアプリケーションが ApsaraMQ for MQTT に接続するために使用されます。

重要

エンドポイントを使用してクライアントを ApsaraMQ for MQTT に接続する場合は、IP アドレスの代わりにドメイン名を使用してください。IP アドレスは動的に変化するためです。ApsaraMQ for MQTT 技術チームは、以下のシナリオにおける障害および直接的または間接的な損失について責任を負いません。

  • IP アドレスを使用してクライアントに ApsaraMQ for MQTT にアクセスします。ApsaraMQ for MQTT の技術チームがドメイン名解決を更新した後、元の IP アドレスは無効になります。

  • クライアントが実行されているネットワークで、IP アドレスのファイアウォールポリシーが設定されています。ApsaraMQ for MQTT の技術チームがドメイン名解決を更新した後、ファイアウォールポリシーにより新しい IP アドレスがブロックされます。

前提条件

  • 統合開発環境 (IDE) がインストールされていること。詳細については、IDE をご参照ください。IntelliJ IDEA または Eclipse を使用できます。この例では、IntelliJ IDEA を使用しています。

  • Java 8 または 11 がインストールされていること。詳細については、Java のダウンロード をご参照ください。

  • ApsaraMQ for MQTT インスタンスが作成され、インスタンスにトピックとグループが作成されていること。詳細については、リソースの作成 をご参照ください。

  • ApsaraMQ for RocketMQ インスタンスが作成され、インスタンスにトピックとグループが作成されていること。詳細については、手順 2: リソースの作成 をご参照ください。

重要
  • ApsaraMQ for MQTT のデータ受信ルールを使用して、ApsaraMQ for RocketMQ 4.x インスタンスからのみデータをインポートできます。

  • リージョンをまたがって ApsaraMQ for MQTT のデータ受信ルールを使用することはできません。データ受信ルールを作成する際は、ApsaraMQ for MQTTApsaraMQ for RocketMQ のリソースが同じリージョンにあることを確認してください。

1. データ受信ルールの作成

  1. ApsaraMQ for MQTT コンソール にログインします。左側のナビゲーションペインで、インスタンスリスト をクリックします。

  2. 上部のナビゲーションバーで、管理するインスタンスが存在するリージョンを選択します。[インスタンス] ページで、インスタンス名をクリックして インスタンスの詳細 ページに移動します。

  3. 左側のナビゲーションペインで、ルール管理 をクリックします。[ルール] ページの左上隅にある ルールの作成 をクリックします。

  4. ルールの作成 ウィザードで、次の手順を実行します。

    1. 基本情報の構成 手順で、ルール ID を指定し、[ルールの種類] パラメーターに [データ受信] を選択します。

      image

    2. ルールソースの構成 手順で、作成済みの ApsaraMQ for RocketMQ インスタンスと、そのインスタンスに作成済みのトピックを選択します。

      image

    3. ルールの宛先の構成 手順で、ApsaraMQ for MQTT インスタンスに作成済みのトピックを選択します。

      image

2. テストコードの準備

2.1 サンプルコードのダウンロード

  1. mqtt-java-demo デモプロジェクトをダウンロードし、デモプロジェクトパッケージをオンプレミスマシンのフォルダーに解凍します。

  2. 解凍したデモプロジェクトで、lmq-java-demo フォルダーを探し、そのフォルダーを IntelliJ IDEA にインポートし、pom.xml ファイルに次の依存関係が含まれていることを確認します。

    <dependencies>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.5.Final</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
    </dependencies>
  3. アクセス認証情報を構成します。

    • AccessKey ペアを取得します。AccessKey ペアの取得方法については、AccessKey の作成 をご参照ください。

    • 環境変数を構成します。ApsaraMQ for MQTT へのアクセスに使用する AccessKey ID の環境変数名は MQTT_AK_ENV で、ApsaraMQ for MQTT へのアクセスに使用する AccessKey シークレットの環境変数名は MQTT_SK_ENV です。環境変数の構成方法については、アクセス認証情報の構成 をご参照ください。

2.2 メッセージング用コードの構成

RocketMQSendMessageToMQ4IoT.java クラスには、ApsaraMQ for RocketMQ を使用してメッセージを送信し、ApsaraMQ for MQTT を使用してメッセージを受信するためのコードが含まれています。コード内のコメントに基づいて、ApsaraMQ for RocketMQ リソースと ApsaraMQ for MQTT リソースのパラメーターを指定する必要があります。

メッセージングをテストする場合は、ポイントツーポイント (P2P) メッセージの送信に関連するコードをコメントアウトできます。サンプルコード:

メッセージングのサンプルコード

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class RocketMQSendMessageToMQ4IoT {
    public static void main(String[] args) throws Exception {
        /**
         * 送信者として ApsaraMQ for RocketMQ クライアントを初期化します。ほとんどのビジネスシナリオでは、送信者はバックエンドアプリケーションにデプロイされます。
         */
        Properties properties = new Properties();
        /**
         * ApsaraMQ for RocketMQ コンソールで作成したグループの ID。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
        /**
         * ID 認証のために Alibaba Cloud Resource Access Management (RAM) コンソールで作成した AccessKey ID。
         * Alibaba Cloud アカウントの AccessKey ペアは、すべての API 操作に対する権限を持っています。セキュリティリスクを防ぐため、RAM ユーザーを使用して API 操作を呼び出したり、日常的な O&M を実行することをお勧めします。
         * AccessKey ペアをプロジェクトコードに保存しないことを強くお勧めします。そうしないと、AccessKey ペアが漏洩し、アカウント内のすべてのリソースが潜在的なセキュリティリスクにさらされる可能性があります。
         * この例では、AccessKey ID と AccessKey シークレットは環境変数に格納されています。
         */
        properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
        /**
         * ID 認証のために Alibaba Cloud RAM コンソールで作成した AccessKey シークレット。AccessKey シークレットは、署名認証モードを使用する場合にのみ必要です。
         */
        properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
        /**
         * ApsaraMQ for RocketMQ インスタンスにアクセスするために使用される TCP エンドポイント。TCP エンドポイントは、ApsaraMQ for RocketMQ コンソールの [インスタンスの詳細] ページで取得できます。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
        /**
         * ApsaraMQ for RocketMQ トピック。ApsaraMQ for RocketMQ コンソールでトピックを作成できます。
         * ApsaraMQ for RocketMQ と ApsaraMQ for MQTT 間でデータを交換する場合、ApsaraMQ for RocketMQ クライアントは親トピックのみ使用できます。
         */
        final String parentTopic = "XXXXX";
        Producer producer = ONSFactory.createProducer(properties);
        producer.start();
        //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
        /**
         * 受信者として ApsaraMQ for MQTT クライアントを初期化します。ほとんどのビジネスシナリオでは、受信者はモバイル端末にデプロイされます。
         */

        /**
         * ApsaraMQ for MQTT コンソールで作成した ApsaraMQ for MQTT インスタンスの ID。
         */
        String instanceId = "XXXXX";
        /**
         * ApsaraMQ for MQTT インスタンスにアクセスするために使用するエンドポイント。エンドポイントは、ApsaraMQ for MQTT コンソールの [インスタンスの詳細] ページで取得できます。
         */
        String endPoint = "XXXXXX.mqtt.aliyuncs.com";
        /**
         * ID 認証のために Alibaba Cloud RAM コンソールで作成した AccessKey ID。
         * Alibaba Cloud アカウントの AccessKey ペアは、すべての API 操作に対する権限を持っています。セキュリティリスクを防ぐため、RAM ユーザーを使用して API 操作を呼び出したり、日常的な O&M を実行することをお勧めします。
         * AccessKey ペアをプロジェクトコードに保存しないことを強くお勧めします。そうしないと、AccessKey ペアが漏洩し、アカウント内のすべてのリソースが潜在的なセキュリティリスクにさらされる可能性があります。
         * この例では、AccessKey ID と AccessKey シークレットは環境変数に格納されています。
         */
        String accessKey = System.getenv("MQTT_AK_ENV");
        /**
         * ID 認証のために Alibaba Cloud RAM コンソールで作成した AccessKey シークレット。このパラメーターは、署名認証モードを使用する場合にのみ必要です。
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        /**
         * システムが ApsaraMQ for MQTT クライアントに割り当てるグローバルに一意の ID。クライアント ID は、TCP 接続ごとに異なる必要があります。複数の TCP 接続が同じクライアント ID を使用すると、例外が発生し、接続が予期せず閉じられます。
         * クライアント ID は、グループ ID とデバイス ID で構成され、GroupID@@@DeviceID 形式です。グループ ID は、ApsaraMQ for MQTT コンソールで作成したグループの ID です。デバイス ID は、指定するカスタム ID です。クライアント ID は 64 文字以下にする必要があります。
         */
        String clientId = "GID_XXXX@@@XXXXX";
        /**
         * ApsaraMQ for MQTT では、サブトピックを使用してメッセージをフィルタリングできます。サブトピックの名前として文字列を指定できます。
         * mq4IotTopic パラメーターの値は、最大 128 文字まで指定できます。
         */
        final String subTopic = "/testMq4Iot";
        final String mq4IotTopic = parentTopic + subTopic;
        /**
         * メッセージ送信のサービス品質 (QoS) レベル。有効な値: 0、1、2。詳細については、「用語」をご参照ください。
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * ApsaraMQ for MQTT クライアントが使用するプロトコルとポート。ApsaraMQ for MQTT クライアントが使用するプロトコルとポートは一致する必要があります。Secure Sockets Layer (SSL) 暗号化を使用する場合は、プロトコルとポートとして ssl://endpoint:8883 を指定します。
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * ApsaraMQ for MQTT クライアントが応答を待つタイムアウト期間。タイムアウト期間を設定することで、ApsaraMQ for MQTT クライアントが無期限に応答を待つことを防ぎます。
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * クライアント接続が確立された後、できるだけ早くコンシューマーがサブスクライブする必要があるトピック。
                 */
                System.out.println("connect success");
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            final String topicFilter[] = {mq4IotTopic};
                            final int[] qos = {qosLevel};
                            mqttClient.subscribe(topicFilter, qos);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * パブリッシュされたメッセージを消費するために呼び出されるコールバック。コールバックが例外をスローしないようにしてください。コールバックに対して応答が返された場合、メッセージは消費されます。
                 * メッセージは特定の期間内に消費される必要があります。ApsaraMQ for MQTT ブローカーによって指定されたタイムアウト期間内にメッセージが消費されない場合、ブローカーはメッセージの再送信を試みる可能性があります。メッセージ消費の冪等性を確保するために、重複排除が実行されていることを確認してください。
                 */
                System.out.println(
                    "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            /**
             * ApsaraMQ for RocketMQ クライアントを使用して ApsaraMQ for MQTT クライアントにメッセージを送信する場合は、トピックとして親トピックを、タグとして MQ2MQTT を指定します。
             */
            Message msg = new Message(parentTopic, "MQ2MQTT", "hello mq send mqtt msg".getBytes());
            /**
             * ApsaraMQ for RocketMQ クライアントを使用して ApsaraMQ for MQTT クライアントにメッセージを送信する場合は、MqttSecondTopic パラメーターを使用してサブトピックを指定できます。
             */
            msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
            SendResult result = producer.send(msg);
            System.out.println(result);
//            /**
//             * P2P メッセージを送信し、サブトピックを指定します。
//             */
//            msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, "/p2p/" + clientId);
//            result = producer.send(msg);
//            System.out.println(result);
        }
        Thread.sleep(Long.MAX_VALUE);

    }

}

3. 結果の確認

RocketMQSendMessageToMQ4IoT.java クラスの main 関数を呼び出し、次のいずれかの方法を使用して、メッセージの送受信を確認できます。

コードの使用

次のようなコードが表示された場合、メッセージは ApsaraMQ for RocketMQ によって送信され、ApsaraMQ for MQTT によって消費されています。

image

コンソールの使用

  • メッセージが送信されたかどうかの確認ApsaraMQ for RocketMQ コンソールのインスタンス詳細ページに移動し、左側のナビゲーションペインで [メッセージクエリ] をクリックします。[メッセージクエリ] ページで、トピックとメッセージ ID によってメッセージが送信されたかどうかを確認します。

    image

  • メッセージが消費されたかどうかの確認ApsaraMQ for MQTT コンソールのインスタンス詳細ページに移動し、左側のナビゲーションペインで Message trace query をクリックします。[メッセージトレースクエリ] ページで、メッセージ ID によってメッセージが消費されたかどうかを確認します。

    image

関連情報