背景情報
ApsaraMQ for MQTT はクラウド SDK をサポートしています。クラウド SDK を使用して、クラウドアプリケーションを ApsaraMQ for MQTT ブローカーに接続し、メッセージを送受信できます。クラウド SDK の使用方法については、概要 をご参照ください。
ApsaraMQ for MQTT は、ApsaraMQ for MQTT と他の Alibaba Cloud サービス間のデータ交換もサポートしています。現在、ApsaraMQ for MQTT と ApsaraMQ for RocketMQ 間でのみデータを交換できます。
このトピックでは、Java 用 SDK を使用して、インターネット経由で ApsaraMQ for MQTT から ApsaraMQ for RocketMQ にデータをエクスポートする方法について説明します。
このシナリオでは、複数のプログラミング言語用のサードパーティのオープンソース SDK を使用して、メッセージを送受信できます。詳細については、SDK のダウンロード をご参照ください。

ネットワークアクセス
ApsaraMQ for MQTT は、インターネットアクセスポイントと VPC アクセスポイントを提供します。
インターネットアクセスポイントは、インターネット経由で ApsaraMQ for MQTT にアクセスするために使用される IP アドレスです。ほとんどの場合、パブリックエンドポイントは IoT およびモバイルインターネットのシナリオで使用されます。
VPC アクセスポイントは、プライベート仮想クラウド (VPC) 内で ApsaraMQ for MQTT にアクセスするために使用される IP アドレスです。ほとんどの場合、VPC エンドポイントはクラウドアプリケーションによって ApsaraMQ for MQTT に接続するために使用されます。
重要 エンドポイントを使用してクライアントを ApsaraMQ for MQTT に接続する場合は、IP アドレスではなくドメイン名を使用してください。IP アドレスは動的に変化するためです。ApsaraMQ for MQTT 技術チームは、以下のシナリオにおける障害および直接的または間接的な損失について責任を負いません。
IP アドレスを使用してクライアントに ApsaraMQ for MQTT にアクセスします。ApsaraMQ for MQTT の技術チームがドメイン名解決を更新した後、元の IP アドレスは無効になります。
クライアントが実行されているネットワークで、IP アドレスのファイアウォールポリシーが設定されています。ApsaraMQ for MQTT の技術チームがドメイン名解決を更新した後、ファイアウォールポリシーにより新しい IP アドレスがブロックされます。
前提条件
統合開発環境 (IDE) がインストールされていること。詳細については、IDE をご参照ください。IntelliJ IDEA または Eclipse を使用できます。この例では、IntelliJ IDEA を使用しています。
Java 8 または 11 がインストールされていること。詳細については、Java のダウンロード をご参照ください。
ApsaraMQ for MQTT インスタンスが作成され、インスタンスにトピックとグループが作成されていること。詳細については、リソースの作成 をご参照ください。
ApsaraMQ for RocketMQ インスタンスが作成され、インスタンスにトピックとグループが作成されていること。詳細については、手順 2: リソースの作成 をご参照ください。
重要 ApsaraMQ for MQTT のデータアウトバウンドルールを使用して、ApsaraMQ for RocketMQ 4.x インスタンスにのみデータをエクスポートできます。
リージョンをまたがって ApsaraMQ for MQTT のデータアウトバウンドルールを使用することはできません。データアウトバウンドルールを作成する際は、ApsaraMQ for MQTT リソースと ApsaraMQ for RocketMQ リソースが同じリージョンにあることを確認してください。
1. データアウトバウンドルールの作成
ApsaraMQ for MQTT コンソール にログオンします。左側のナビゲーションペインで、インスタンスリスト をクリックします。
上部のナビゲーションバーで、管理するインスタンスが存在するリージョンを選択します。[インスタンス] ページで、インスタンス名をクリックして インスタンスの詳細 ページに移動します。
左側のナビゲーションペインで、ルール管理 をクリックします。[ルール] ページの左上隅にある ルールの作成 をクリックします。
ルールの作成 ウィザードで、次の手順を実行します。
[基本情報の構成] 手順で、ルール ID を入力し、[ルールの種類] パラメーターに [データアウトバウンド] を選択します。

[ルールソースの構成] 手順で、ApsaraMQ for MQTT インスタンスで作成されたトピックを選択します。

[ルールの宛先の構成] 手順で、作成済みの ApsaraMQ for RocketMQ インスタンスと、そのインスタンスで作成済みのトピックを選択します。

2. テストコードの準備
2.1 サンプルコードのダウンロード
mqtt-java-demo デモプロジェクトをダウンロードし、デモプロジェクトパッケージをオンプレミスマシンのフォルダーに解凍します。
解凍したデモプロジェクトで、lmq-java-demo フォルダーを探し、そのフォルダーを IntelliJ IDEA にインポートし、pom.xml ファイルに次の依存関係が含まれていることを確認します。
<dependencies>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>1.70</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.5.Final</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>
アクセス認証情報を構成します。
AccessKey ペアを取得します。AccessKey ペアの取得方法については、AccessKey の作成 をご参照ください。
環境変数を構成します。ApsaraMQ for MQTT へのアクセスに使用される AccessKey ID の環境変数名は MQTT_AK_ENV で、ApsaraMQ for MQTT へのアクセスに使用される AccessKey シークレットの環境変数名は MQTT_SK_ENV です。環境変数の構成方法については、アクセス認証情報の構成 をご参照ください。
2.2 メッセージング用コードの構成
MQ4IoTSendMessageToRocketMQ.java クラスには、ApsaraMQ for MQTT を使用してメッセージを送信し、ApsaraMQ for RocketMQ を使用してメッセージを受信するためのコードが含まれています。コード内のコメントに基づいて、ApsaraMQ for MQTT リソースと ApsaraMQ for RocketMQ リソースのパラメーターを指定する必要があります。
メッセージングのサンプルコード
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQ4IoTSendMessageToRocketMQ {
public static void main(String[] args) throws Exception {
/**
* レシーバーとして ApsaraMQ for RocketMQ クライアントを初期化します。ほとんどのビジネスシナリオでは、レシーバーはバックエンドアプリケーションにデプロイされます。
*/
Properties properties = new Properties();
/**
* ApsaraMQ for RocketMQ コンソールで作成したグループの ID。
*/
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
/**
* アイデンティティ認証のために Alibaba Cloud Resource Access Management (RAM) コンソールで作成した AccessKey ID。
* Alibaba Cloud アカウントの AccessKey ペアは、すべての API 操作に対する権限を持っています。セキュリティリスクを防ぐため、RAM ユーザーを使用して API 操作を呼び出したり、日常的な O&M を実行することをお勧めします。
* AccessKey ペアをプロジェクトコードに保存しないことを強くお勧めします。そうしないと、AccessKey ペアが漏洩し、アカウント内のすべてのリソースが潜在的なセキュリティリスクにさらされる可能性があります。
* この例では、AccessKey ID と AccessKey シークレットは環境変数に格納されています。
*/
properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
/**
* アイデンティティ認証のために Alibaba Cloud RAM コンソールで作成した AccessKey シークレット。このパラメーターは、署名認証モードを使用する場合にのみ必須です。
*/
properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
/**
* ApsaraMQ for RocketMQ インスタンスへのアクセスに使用する TCP エンドポイント。TCP エンドポイントは、ApsaraMQ for RocketMQ コンソールの [インスタンスの詳細] ページで取得できます。
*/
properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://xxxxx.XXXXX.mq-internet.aliyuncs.com");
/**
* ApsaraMQ for RocketMQ コンソールで作成したトピック。
* ApsaraMQ for RocketMQ と ApsaraMQ for MQTT 間でデータを交換する場合、ApsaraMQ for RocketMQ クライアントは親トピックのみ使用できます。
*/
final String parentTopic = "XXXXX";
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(parentTopic, "*", new MessageListener() {
public Action consume(Message message, ConsumeContext consumeContext) {
System.out.println("recv msg:" + message);
return Action.CommitMessage;
}
});
consumer.start();
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* センダーとして ApsaraMQ for MQTT クライアントを初期化します。ほとんどのビジネスシナリオでは、センダーはモバイル端末にデプロイされます。
*/
/**
* コンソールで作成した ApsaraMQ for MQTT インスタンスの ID。
*/
String instanceId = "XXXXX";
/**
* ApsaraMQ for MQTT インスタンスへのアクセスに使用するエンドポイント。エンドポイントは、ApsaraMQ for MQTT コンソールの [インスタンスの詳細] ページで取得できます。
*/
String endPoint = "XXXXXX.mqtt.aliyuncs.com";
/**
* アイデンティティ認証のために Alibaba Cloud RAM コンソールで作成した AccessKey ID。
* Alibaba Cloud アカウントの AccessKey ペアは、すべての API 操作に対する権限を持っています。セキュリティリスクを防ぐため、RAM ユーザーを使用して API 操作を呼び出したり、日常的な O&M を実行することをお勧めします。
* AccessKey ペアをプロジェクトコードに保存しないことを強くお勧めします。そうしないと、AccessKey ペアが漏洩し、アカウント内のすべてのリソースが潜在的なセキュリティリスクにさらされる可能性があります。
* この例では、AccessKey ID と AccessKey シークレットは環境変数に格納されています。
*/
String accessKey = System.getenv("MQTT_AK_ENV");
/**
* アイデンティティ認証のために Alibaba Cloud RAM コンソールで作成した AccessKey シークレット。このパラメーターは、署名認証モードを使用する場合にのみ必須です。
*/
String secretKey = System.getenv("MQTT_SK_ENV");
/**
* システムが ApsaraMQ for MQTT クライアントに割り当てるグローバルに一意の ID。クライアント ID は、TCP 接続ごとに異なる必要があります。複数の TCP 接続が同じクライアント ID を使用すると、例外が発生し、接続が予期せず閉じられます。
* クライアント ID は、グループ ID とデバイス ID で構成され、GroupID@@@DeviceID 形式です。グループ ID は、ApsaraMQ for MQTT コンソールで作成したグループの ID です。デバイス ID は、指定するカスタム ID です。クライアント ID は 64 文字以下にする必要があります。
*/
String clientId = "GID_XXXX@@@XXXXX";
/**
* ApsaraMQ for MQTT では、サブトピックを使用してメッセージをフィルタリングできます。サブトピックの名前として文字列を指定できます。
* mq4IotTopic パラメーターの値は、最大 128 文字までです。
*/
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
/**
* メッセージ送信のサービス品質 (QoS) レベル。有効な値: 0、1、2。詳細については、「用語」をご参照ください。
*/
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* ApsaraMQ for MQTT クライアントが使用するプロトコルとポート。ApsaraMQ for MQTT クライアントが使用するプロトコルとポートは一致している必要があります。Secure Sockets Layer (SSL) 暗号化を使用する場合は、プロトコルとポートとして ssl://endpoint:8883 を指定します。
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
/**
* ApsaraMQ for MQTT クライアントが応答を待つタイムアウト期間。タイムアウト期間により、ApsaraMQ for MQTT クライアントが無期限に応答を待つことを防ぎます。
*/
mqttClient.setTimeToWait(5000);
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
/**
* クライアント接続が確立された後、できるだけ早くコンシューマーがサブスクライブする必要があるトピック。
*/
System.out.println("connect success");
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
/**
* メッセージの送信先となるトピック。送信されるメッセージが通常のメッセージの場合、トピックはコンシューマーがサブスクライブするトピック、またはワイルドカードを使用して一致させることができるトピックである必要があります。
*/
mqttClient.publish(mq4IotTopic, message);
}
Thread.sleep(Long.MAX_VALUE);
}
}
3. 結果の確認
MQ4IoTSendMessageToRocketMQ.java クラスの main 関数を実行し、次のいずれかの方法を使用してメッセージの送受信を確認できます。
コードの使用
次の図のコードに類似したコードが表示された場合、メッセージは ApsaraMQ for MQTT によって送信され、ApsaraMQ for RocketMQ によって消費されます。

コンソールの使用
メッセージが送信されたかどうかを確認します。ApsaraMQ for MQTT コンソールの [インスタンスの詳細] ページに移動し、左側のナビゲーションペインで [メッセージトレースクエリ] をクリックします。[メッセージトレースクエリ] ページで、グループ ID とデバイス ID を使用してメッセージが送信されたかどうかを確認します。

メッセージが消費されたかどうかを確認します。ApsaraMQ for RocketMQ コンソールの [インスタンスの詳細] ページに移動し、左側のナビゲーションペインで [メッセージクエリ] をクリックします。[メッセージクエリ] ページで、次の図に示すように、トピックに基づいてメッセージが ApsaraMQ for RocketMQ に転送されたかどうかを確認します。

クエリされたメッセージの [アクション] 列にある [メッセージトレース] をクリックします。次の図の情報に類似した情報が表示された場合、メッセージは消費されています。
