本チュートリアルでは、ApsaraMQ for MQTT クライアントからメッセージを送信し、バックエンドサービスアプリケーションで受信する手順について説明します。クライアント側では クライアント SDK(Eclipse Paho)を使用してメッセージをパブリッシュし、バックエンド側では クラウド SDK を使用してメッセージをコンシュームします。
仕組み
ApsaraMQ for MQTT クライアントとバックエンドサービスアプリケーションは、ApsaraMQ for MQTT ブローカーを介して通信します。それぞれの側で使用される SDK は異なります。
| SDK | ライブラリ | 接続対象 | 利用シーン |
|---|---|---|---|
| クライアント SDK | Eclipse Paho Java Client | IoT デバイス、モバイルアプリ | MQTT を使用したメッセージのパブリッシュおよびサブスクライブ |
| クラウド SDK | ApsaraMQ for MQTT サーバー SDK | バックエンドサービスアプリケーション | 大規模なメッセージのコンシュームまたは送信、親トピックレベルでのサブスクライブ |

前提条件
開始する前に、以下の環境が整っていることを確認してください。
ApsaraMQ for MQTT インスタンス(グループ、トピックなどの必要なリソースを含む)。詳細については、「リソースの作成」をご参照ください。
AccessKey ペア。詳細については、「AccessKey ペアの取得」をご参照ください。
JDK のインストール
IntelliJ IDEA または Eclipse のインストール(本チュートリアルでは IntelliJ IDEA を使用します)
エンドポイント
両方の SDK では、ご利用の ApsaraMQ for MQTT インスタンスに接続するためのエンドポイントが必要です。
クライアント SDK のエンドポイント
| アクセスタイプ | フォーマット | 代表的な利用シーン |
|---|---|---|
| パブリックエンドポイント | <instance-id>.mqtt.aliyuncs.com | IoT デバイス、モバイルアプリ |
| VPC エンドポイント | <instance-id>-internal-vpc.mqtt.aliyuncs.com | VPC 内のクライアント |
ApsaraMQ for MQTT コンソールの エンドポイント タブ(インスタンスの詳細 ページ内)で、クライアント SDK のエンドポイントを確認できます。ApsaraMQ for MQTT コンソール
クラウド SDK のエンドポイント
| アクセスタイプ | フォーマット | 代表的な利用シーン |
|---|---|---|
| パブリックエンドポイント | <instance-id>-server-internet.mqtt.aliyuncs.com | インターネット経由のバックエンドアプリケーション |
| VPC エンドポイント | <instance-id>-server-internal.mqtt.aliyuncs.com | VPC 内のバックエンドアプリケーション |
クラウド SDK によるアクセスに対応していないリージョンがあります。対応リージョンについては、関連ドキュメントをご参照ください。
ApsaraMQ for MQTT コンソールの 基本情報 セクション(インスタンスの詳細 ページ内)で、インスタンス ID を確認できます。ApsaraMQ for MQTT コンソール
IP アドレスではなく、必ずドメイン名を使用してください。ドメイン解決の更新時に IP アドレスが変更される場合があり、事前の通知はありません。以下の理由による接続障害については、ApsaraMQ for MQTT は責任を負いません。
DNS 更新後に無効となるハードコードされた IP アドレスの使用
DNS 更新後に新しい IP アドレスをブロックするファイアウォールルール
ステップ 1:クライアント SDK プロジェクトのセットアップ
Java 向けサードパーティ製オープンソース SDK をダウンロードします:Eclipse Paho Java Client。
デモプロジェクト mqtt-java-demo をダウンロードし、展開して IntelliJ IDEA にインポートします。
pom.xml に以下の依存関係が含まれていることを確認してください。
<dependencies>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>ステップ 2:認証情報の設定
認証のために、以下の環境変数を設定します。「アクセス認証情報の設定」をご参照ください。
# AccessKey ID
export MQTT_AK_ENV=<your-access-key-id>
# AccessKey secret
export MQTT_SK_ENV=<your-access-key-secret>ソースコード内に AccessKey ペアをハードコードしないでください。認証情報の漏洩を防ぐため、環境変数に格納してください。
ステップ 3:クライアント SDK を使用したメッセージ送信
MQ4IoTProducerDemo.java を開き、ApsaraMQ for MQTT インスタンスの値に合わせて以下のパラメーターを更新します。「リソースの作成」で各値の確認方法をご確認ください。
| パラメーター | 説明 | 例 |
|---|---|---|
instanceId | ApsaraMQ for MQTT インスタンス ID | post-cn-xxxxx |
endPoint | クライアント SDK エンドポイント(上記「エンドポイント」セクションを参照) | post-cn-xxxxx.mqtt.aliyuncs.com |
clientId | {GroupID}@@@{DeviceID} 形式のグローバルに一意なクライアント ID。最大 64 文字。各 TCP 接続には異なるクライアント ID を使用する必要があります。 | GID_test@@@device_001 |
parentTopic | コンソールで作成した親トピック | testTopic |
ブローカーへの接続
MQTT クライアントを作成し、接続を確立します。プレーンテキスト接続には tcp://endpoint:1883、SSL/TLS 接続には ssl://endpoint:8883 を使用します。
String instanceId = "XXXXX";
String endPoint = "XXXXX.mqtt.aliyuncs.com";
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
String clientId = "GID_XXXXX@@@XXXXX";
ConnectionOptionWrapper connectionOptionWrapper =
new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
final MqttClient mqttClient =
new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
// 応答タイムアウト(ミリ秒)を設定し、無限待機を回避します
mqttClient.setTimeToWait(5000);クライアント ID は {GroupID}@@@{DeviceID} 形式であり、TCP 接続ごとにグローバルに一意である必要があります。同一のクライアント ID を複数の接続で再利用すると、ブローカーが既存のセッションを切断します。コールバックの設定
セッション再開時にメッセージを欠落させないよう、接続前にコールバックを設定します。
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connect success");
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// ブローカーのタイムアウト内にメッセージを処理し、再配信を回避します。
// 冪等性を保証するため、重複排除を実装します。
System.out.println(
"receive msg from topic " + s + " , body is "
+ new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : "
+ iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());メッセージのパブリッシュ
サブトピックにメッセージをパブリッシュします。サブトピックは親トピックに追加され、最大 128 文字まで指定できます。
final String parentTopic = "XXXXX";
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
final int qosLevel = 0;
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(mq4IotTopic, message);QoS レベル: 0(最大 1 回)、1(最低 1 回)、2(正確に 1 回)。
ポイント・ツー・ポイント(P2P)メッセージの送信
P2P メッセージングでは、特定のクライアントに対して直接メッセージを送信します。宛先クライアントはトピックをサブスクライブする必要はありません。{parentTopic}/p2p/{targetClientId} にパブリッシュします。
String receiverId = "GID_test@@@device_002";
final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
MqttMessage p2pMessage = new MqttMessage("hello mq4Iot p2p msg".getBytes());
p2pMessage.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, p2pMessage);プロデューサーコードの完成版
以下は、上記すべての手順を統合したコードです。MQ4IoTProducerDemo.java を開き、パラメーターを更新してください。
package com.aliyun.openservices.lmq.example.demo;
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQ4IoTProducerDemo {
public static void main(String[] args) throws Exception {
// ApsaraMQ for MQTT インスタンス ID
String instanceId = "XXXXX";
// クライアント SDK エンドポイント。IP アドレスではなくドメイン名を使用します。
String endPoint = "XXXXX.mqtt.aliyuncs.com";
// 環境変数から認証情報を読み込みます
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
// クライアント ID 形式:{GroupID}@@@{DeviceID}
// TCP 接続ごとに一意である必要があります。最大 64 文字。
String clientId = "GID_XXXXX@@@XXXXX";
// ApsaraMQ for MQTT コンソールで作成した親トピック。
// 無効または未承認のトピックを指定すると、ブローカーが接続を閉じます。
final String parentTopic = "XXXXX";
// メッセージフィルタリング用のサブトピック。最大 128 文字。
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
// QoS レベル:0、1、または 2
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper =
new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
// ポート 1883 で TCP 接続を行います。SSL 接続の場合は ssl://endpoint:8883 を使用します。
final MqttClient mqttClient =
new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
// 応答タイムアウト(ミリ秒)を設定し、無限待機を回避します
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(
1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
// セッション再開時にメッセージを欠落させないよう、接続前にコールバックを設定します
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connect success");
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// ブローカーのタイムアウト内にメッセージを処理し、再配信を回避します。
// 冪等性を保証するため、重複排除を実装します。
System.out.println(
"receive msg from topic " + s + " , body is "
+ new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : "
+ iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
// サブトピックへのパブリッシュ(標準のパブリッシュ/サブスクライブ)
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(mq4IotTopic, message);
// 特定のクライアントへ直接送信するポイント・ツー・ポイント(P2P)メッセージ。
// 宛先クライアントはこのトピックをサブスクライブする必要はありません。
// トピック形式:{parentTopic}/p2p/{targetClientId}
String receiverId = "xxx";
final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, message);
}
Thread.sleep(Long.MAX_VALUE);
}
}プロデューサーの検証
MQ4IoTProducerDemo.java を実行します。接続が成功すると、次のような出力が表示されます。
connect success
send msg succeed topic is : XXXXX/testMq4Iot
send msg succeed topic is : XXXXX/p2p/xxx
...ステップ 4:クラウド SDK を使用したメッセージ受信
クラウド SDK を使用すると、バックエンドアプリケーションを ApsaraMQ for MQTT ブローカーに接続できます。クライアント SDK とは異なり、クラウド SDK は親トピックのみ(サブトピックは不可)をサブスクライブするため、トピック下のすべてのメッセージを収集・分析するバックエンドシナリオに適しています。
プロジェクトのセットアップ
クラウド SDK をダウンロードします。「リリースノート」から最新バージョンをご確認ください。
デモプロジェクトをダウンロードします:mqtt-server-sdk-demo。
パッケージを展開し、IntelliJ IDEA にインポートします。
pom.xmlに以下の依存関係が含まれていることを確認してください。<dependencies> <dependency> <groupId>com.alibaba.mqtt</groupId> <artifactId>server-sdk</artifactId> <version>1.0.0.Final</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> </dependencies>
コンシューマーの設定と実行
MQTTConsumerDemo.java を開き、以下のパラメーターを更新します。
| パラメーター | 説明 | 例 |
|---|---|---|
domain | クラウド SDK エンドポイント(上記「エンドポイント」セクションを参照) | post-cn-xxxxx-server-internet.mqtt.aliyuncs.com |
port | クラウド SDK ポート。常に 5672。 | 5672 |
instanceId | ApsaraMQ for MQTT インスタンス ID | post-cn-xxxxx |
firstTopic | サブスクライブする親トピック | testTopic |
認証情報は、ステップ 2 で設定した環境変数(MQTT_AK_ENV および MQTT_SK_ENV)から読み込まれます。
接続およびサブスクライブ
String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com";
int port = 5672;
String instanceId = "post-cn-jaj3h8i****";
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
String firstTopic = "firstTopic";
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.setDomain(domain);
channelConfig.setPort(port);
channelConfig.setInstanceId(instanceId);
channelConfig.setAccessKey(accessKey);
channelConfig.setSecretKey(secretKey);
ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
serverConsumer.start();
serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
@Override
public void process(String msgId, MessageProperties messageProperties,
byte[] payload) {
System.out.println("Receive:" + msgId + ","
+ JSONObject.toJSONString(messageProperties) + ","
+ new String(payload));
}
});コンシューマーコードの完成版
package com.aliyun.openservices.lmq.example;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.mqtt.server.ServerConsumer;
import com.alibaba.mqtt.server.callback.MessageListener;
import com.alibaba.mqtt.server.config.ChannelConfig;
import com.alibaba.mqtt.server.config.ConsumerConfig;
import com.alibaba.mqtt.server.model.MessageProperties;
public class MQTTConsumerDemo {
public static void main(String[] args) throws Exception {
// クラウド SDK エンドポイント。IP アドレスではなくドメイン名を使用します。
String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com";
// クラウド SDK ポート。5672 である必要があります。
int port = 5672;
// ApsaraMQ for MQTT インスタンス ID
String instanceId = "post-cn-jaj3h8i****";
// 環境変数から認証情報を読み込みます
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
// サブスクライブする親トピック。
// クラウド SDK は親トピックのみをサブスクライブします(サブトピックは不可)。
// 無効または未承認のトピックを指定すると、ブローカーが接続を閉じます。
String firstTopic = "firstTopic";
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.setDomain(domain);
channelConfig.setPort(port);
channelConfig.setInstanceId(instanceId);
channelConfig.setAccessKey(accessKey);
channelConfig.setSecretKey(secretKey);
ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
serverConsumer.start();
// サブスクライブおよび受信メッセージの処理
serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
@Override
public void process(String msgId, MessageProperties messageProperties,
byte[] payload) {
System.out.println("Receive:" + msgId + ","
+ JSONObject.toJSONString(messageProperties) + ","
+ new String(payload));
}
});
}
}基本概念
| 概念 | 説明 |
|---|---|
| クライアント ID | フォーマット:{GroupID}@@@{DeviceID}。TCP 接続ごとにグローバルに一意である必要があります。最大 64 文字。同一のクライアント ID を複数の接続で再利用すると、ブローカーが既存のセッションを切断します。 |
| 親トピック | ApsaraMQ for MQTT コンソールで作成します。無効または未承認のトピックを指定すると、ブローカーが接続を閉じます。 |
| サブトピック | 親トピックに追加されます(例:parentTopic/testMq4Iot)。メッセージフィルタリングに使用されます。最大 128 文字。 |
| QoS | サービス品質(Quality of Service)レベル:0(最大 1 回)、1(最低 1 回)、2(正確に 1 回)。 |
| P2P メッセージング | {parentTopic}/p2p/{targetClientId} にパブリッシュすることで、クライアントがトピックをサブスクライブしなくても特定のクライアントへ直接メッセージを送信できます。 |
| プロトコルおよびポート | クライアント SDK:プレーンテキスト接続では tcp://endpoint:1883、SSL/TLS 接続では ssl://endpoint:8883。クラウド SDK:ポート 5672。 |
よくある質問
自動再接続を有効にするにはどうすればよいですか?
接続前に、setAutomaticReconnect(true) を MqttConnectOptions に設定します。
MqttConnectOptions options = connectionOptionWrapper.getMqttConnectOptions();
options.setAutomaticReconnect(true);
mqttClient.connect(options);SSL/TLS を使用して接続するにはどうすればよいですか?
tcp:// を ssl:// に置き換え、ポートを 8883 に変更します。
final MqttClient mqttClient =
new MqttClient("ssl://" + endPoint + ":8883", clientId, memoryPersistence);バックエンドアプリケーションから MQTT クライアントへメッセージを送信できますか?
はい。逆方向のメッセージ送信(バックエンド → クライアント)には、クラウド SDK のデモプロジェクトに含まれる MQTTProducerDemo.java を使用します。
次のステップ
「デモプロジェクト」で、その他のメッセージングパターンをご確認ください。
クラウド SDK のアクセスに対応しているリージョンをご確認ください。