IoT デバイスやモバイルアプリがバックエンドミドルウェアを介さずに直接メッセージを交換する必要がある場合、パブリッシャーとサブスクライバーの両方を ApsaraMQ for MQTT クライアントとして接続できます。各クライアントは Eclipse Paho Java SDK を使用して、インターネット経由で ApsaraMQ for MQTT ブローカーに接続し、トピックにメッセージをパブリッシュし、サブスクライブされたトピックからメッセージを受信します。

前提条件
開始する前に、以下を準備してください。
ApsaraMQ for MQTT リソース (インスタンス、グループ ID、親トピック) を作成済みであること
認証用の AccessKey ペア
JDK がインストールされていること
IntelliJ IDEA または Eclipse などの IDE
エンドポイント
ApsaraMQ for MQTT インスタンスのエンドポイントは、クライアント SDK を介して接続する際に指定します。エンドポイント形式は、ご利用のネットワークアクセス方法によって異なります。
| アクセス方法 | エンドポイント形式 | 一般的なユースケース |
|---|---|---|
| パブリック (インターネット) | <instance-id>.mqtt.aliyuncs.com | IoT デバイス、モバイルアプリ |
| VPC (プライベートネットワーク) | <instance-id>-internal-vpc.mqtt.aliyuncs.com | クラウド内のバックエンドアプリケーション |
ApsaraMQ for MQTT コンソールの[インスタンス詳細] ページの[エンドポイント] タブでエンドポイントを確認できます。インスタンス ID は[基本情報] セクションに表示されます。
常に IP アドレスではなくドメイン名を使用してください。名前解決が更新されると、IP アドレスは予告なく変更される場合があります。Alibaba Cloud は、ハードコードされた IP アドレスや IP ベースのファイアウォールルールによって引き起こされる接続失敗について責任を負いません。
ステップ 1: Maven 依存関係の追加
デモプロジェクトをクローンまたはダウンロードし、pom.xml に以下の依存関係が含まれていることを確認します。
<dependencies>
<!-- Eclipse Paho: MQTT 3.1.1 client library -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<!-- Commons Codec: HMAC signature calculation for authentication -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<!-- Apache HttpClient: HTTP requests for token-based auth -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<!-- Fastjson: JSON parsing for API responses -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- Alibaba Cloud SDK for MQTT: server-side API calls -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<!-- Alibaba Cloud Core SDK: shared SDK infrastructure -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>Eclipse Paho Java Client のソースコードとドキュメントについては、「Eclipse Paho Java Client」をご参照ください。
ステップ 2: 環境変数の設定
ソースコードでの認証情報の偶発的な漏洩を防ぐため、AccessKey ペアを環境変数に保存します。
export MQTT_AK_ENV=<your-access-key-id>
export MQTT_SK_ENV=<your-access-key-secret>| プレースホルダー | 説明 | 例 |
|---|---|---|
<your-access-key-id> | RAM コンソールからの AccessKey ID | LTAI5tXxx |
<your-access-key-secret> | RAM コンソールからの AccessKey Secret | xXxXxXx |
Alibaba Cloud アカウント (root ユーザー) の AccessKey ペアではなく、RAM ユーザーの AccessKey ペアを使用してください。Alibaba Cloud アカウント (root ユーザー) の AccessKey ペアは、すべての API オペレーションへのアクセスを許可し、漏洩した場合にセキュリティリスクをもたらします。詳細については、「アクセス認証情報の設定」をご参照ください。
ステップ 3: 接続パラメーターの定義
ApsaraMQ for MQTT コンソールで作成したリソースに基づいて、以下のパラメーターを設定します。
// コンソールの基本情報セクションからのインスタンス ID
String instanceId = "<your-instance-id>";
// コンソールのエンドポイントタブからのエンドポイント
String endPoint = "<your-instance-id>.mqtt.aliyuncs.com";
// 環境変数からの認証情報
String accessKey = System.getenv("MQTT_AK_ENV");
// AccessKey Secret は署名認証モードに必要です
String secretKey = System.getenv("MQTT_SK_ENV");
// クライアント ID 形式: GroupID@@@DeviceID (最大 64 文字)
// 各 TCP 接続は一意のクライアント ID を使用する必要があります。
// 重複するクライアント ID は接続競合と予期せぬ切断を引き起こします。
String clientId = "<your-group-id>@@@<your-device-id>";
// コンソールで作成された親トピック
final String parentTopic = "<your-parent-topic>";
// メッセージフィルタリング用のサブトピック (合計最大 128 文字)
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
// QoS レベル: 0 (最大 1 回)、1 (少なくとも 1 回)、または 2 (厳密に 1 回)
final int qosLevel = 0;クライアントが存在しないトピック、またはクライアントがアクセスを許可されていないトピックにパブリッシュまたはサブスクライブした場合、ブローカーは直ちに接続を閉じます。
ステップ 4: MQTT クライアントの作成とコールバックの登録
接続する前にコールバックを登録してください。connect() の後にコールバックを登録すると、特に永続セッションを再開する際に、メッセージを見逃す可能性があります。
MQTT クライアントを作成します。
ConnectionOptionWrapper connectionOptionWrapper =
new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
// プロトコルとポートは一致する必要があります。
// TCP -> tcp://エンドポイント:1883
// SSL -> ssl://エンドポイント:8883
final MqttClient mqttClient = new MqttClient(
"tcp://" + endPoint + ":1883", clientId, memoryPersistence);
// ブローカー応答を待機するタイムアウト (ミリ秒)
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(
1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());接続イベントと受信メッセージを処理するためのイベントコールバックを設定します。
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
// 各接続 (再接続を含む) の直後にサブスクライブします。
System.out.println("connect success");
executorService.submit(() -> {
try {
final String[] topicFilter = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
});
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// ここで受信メッセージを処理します。
// このコールバックから例外をスローしないでください。ブローカーは、
// 正常に返されたコールバックをメッセージの確認応答として扱います。
// べき等な消費を確実にするために、重複排除を実装してください。
System.out.println(
"receive msg from topic " + topic + " , body is "
+ new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : "
+ iMqttDeliveryToken.getTopics()[0]);
}
});主要なコールバック動作:
| コールバック | 動作 |
|---|---|
connectComplete | 自動再接続を含む、各接続時に呼び出されます。サブスクリプションを復元するために、ここで再サブスクライブします。 |
connectionLost | 接続が切断されたときに呼び出されます。エラーをログに記録するか、再接続ロジックを実装します。 |
messageArrived | メッセージが到着したときに呼び出されます。例外をスローしないでください。ブローカーは、正常に返されたコールバックを確認応答として扱います。タイムアウト期間内にメッセージを消費してください。べき等処理のために重複排除を実装してください。 |
deliveryComplete | QoS 1 または QoS 2 メッセージが配信されたときに呼び出されます。 |
ステップ 5: 接続とメッセージのパブリッシュ
ブローカーに接続し、メッセージをパブリッシュします。この例では、通常の Pub/Sub メッセージとポイントツーポイント (P2P) メッセージの両方を送信します。
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
// サブスクライブされたトピックに通常のメッセージをパブリッシュします。
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(mq4IotTopic, message);
// 特定のクライアントに直接 P2P メッセージをパブリッシュします。
// P2P トピック形式: {parentTopic}/p2p/{targetClientId}
// ターゲットクライアントはこのトピックをサブスクライブする必要はありません。
final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, message);
}
// メッセージの受信を継続するためにクライアントを実行し続けます。
Thread.sleep(Long.MAX_VALUE);P2P メッセージング: 正確なターゲットクライアントがわかっている場合、トピック {parentTopic}/p2p/{targetClientId} にメッセージを送信します。ターゲットクライアントはサブスクライブせずにメッセージを受信します。これにより、メッセージが単一の受信者を対象とするシナリオが簡素化されます。
完全なコード
以下のリストは、上記のすべての手順を単一の実行可能なクラスに統合しています。また、これは GitHub 上のデモプロジェクト でも利用可能です。
package com.aliyun.openservices.lmq.example.demo;
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
public static void main(String[] args) throws Exception {
// --- 接続パラメーター ---
String instanceId = "<your-instance-id>";
String endPoint = "<your-instance-id>.mqtt.aliyuncs.com";
String accessKey = System.getenv("MQTT_AK_ENV");
String secretKey = System.getenv("MQTT_SK_ENV");
String clientId = "<your-group-id>@@@<your-device-id>";
final String parentTopic = "<your-parent-topic>";
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
final int qosLevel = 0;
// --- クライアントの作成 ---
ConnectionOptionWrapper connectionOptionWrapper =
new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
final MqttClient mqttClient = new MqttClient(
"tcp://" + endPoint + ":1883", clientId, memoryPersistence);
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(
1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
// --- 接続前のコールバック登録 ---
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connect success");
executorService.submit(() -> {
try {
final String[] topicFilter = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
});
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println(
"receive msg from topic " + topic + " , body is "
+ new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : "
+ iMqttDeliveryToken.getTopics()[0]);
}
});
// --- 接続とパブリッシュ ---
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(mq4IotTopic, message);
final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
message.setQos(qosLevel);
mqttClient.publish(p2pSendTopic, message);
}
Thread.sleep(Long.MAX_VALUE);
}
}結果の検証
アプリケーションを実行した後、ApsaraMQ for MQTT コンソールでメッセージトレースをクエリして、メッセージが送受信されたことを確認します。詳細については、「メッセージトレースをクエリする」をご参照ください。