このトピックでは、ApsaraMQ for MQTT SDK for Java を使用して、ApsaraMQ for MQTT クライアントとバックエンドサービスアプリケーション間のメッセージングを実装する方法について説明します。
前提条件
必要なリソースが作成されていること。詳細については、「リソースの作成」をご参照ください。
AccessKeyペアを取得していること。詳細については、「AccessKeyペアの取得」をご参照ください。
統合開発環境 (IDE) がインストールされていること。詳細については、「IntelliJ IDEA」をご参照ください。IntelliJ IDEA または Eclipse を使用できます。このトピックの例では、IntelliJ IDEA を使用します。
Java Development Kit (JDK) がインストールされていること。詳細については、「Java のダウンロード」をご参照ください。
背景情報
次の図は、ApsaraMQ for MQTT クライアントとバックエンドサービスアプリケーション間のメッセージングを実装するプロセスを示しています。ApsaraMQ for MQTT クライアントはクライアント SDK を使用でき、バックエンドサービスアプリケーションはクラウド SDK を使用して ApsaraMQ for MQTT にアクセスし、双方向通信を実装できます。
このトピックでは、Apsara for MQTT SDK for Java を使用して、インターネット経由で ApsaraMQ for MQTT クライアントとバックエンドサービスアプリケーション間のメッセージングを実装する方法について説明します。メッセージの送受信に使用されるサンプルコードについては、「デモプロジェクト」または「デモ」をご参照ください。
エンドポイント
クライアント SDK とクラウド SDK を使用して ApsaraMQ for MQTT にアクセスする前に、SDK コードで ApsaraMQ for MQTT インスタンスのエンドポイントを指定する必要があります。これにより、ApsaraMQ for MQTT クライアントとバックエンドサービスアプリケーションは、エンドポイントを使用して ApsaraMQ for MQTT ブローカーにアクセスできます。
クライアント SDK のエンドポイントの形式
クライアント SDK を使用してクライアントを ApsaraMQ for MQTT に接続する場合、次のいずれかの形式でエンドポイントを指定します。
インターネットアクセスポイント:
ID of the ApsaraMQ for MQTT instance.mqtt.aliyuncs.comVPC アクセスポイント:
ID of the ApsaraMQ for MQTT instance-internal-vpc.mqtt.aliyuncs.com
エンドポイントインスタンスの詳細ApsaraMQ for MQTT コンソールの ページの タブで、クライアント SDK のエンドポイントを表示することもできます。
クラウド SDK のエンドポイントの形式
クラウド SDK を使用してバックエンドサービスアプリケーションを ApsaraMQ for MQTT に接続する場合、次のいずれかの形式でエンドポイントを指定します。
重要クラウド SDK を使用して接続できるのは、カーネルバージョンが V3.3.0 で、中国本土のリージョンにデプロイされている ApsaraMQ for MQTT インスタンスのみです。
インターネットアクセスポイント:
ID of the ApsaraMQ for MQTT instance-server-internet.mqtt.aliyuncs.comVPC アクセスポイント:
ID of the ApsaraMQ for MQTT instance-server-internal.mqtt.aliyuncs.com
ApsaraMQ for MQTT インスタンスの ID基本情報インスタンスの詳細ApsaraMQ for MQTT コンソールの ページの セクションで、 を取得できます。
クライアント SDK とクラウド SDK のエンドポイントは、パブリックエンドポイントまたは VPC エンドポイントにすることができます。パブリックエンドポイントは、インターネットからのアクセス用の IP アドレスであり、IoT およびモバイルインターネットのシナリオで一般的に使用されます。プライベートエンドポイントは、仮想プライベートクラウド (VPC) からのアクセス用の IP アドレスであり、バックエンドサービスアプリケーションが ApsaraMQ for MQTT にアクセスするために一般的に使用されます。
エンドポイントを使用して SDK を ApsaraMQ for MQTT に接続する場合は、IP アドレスではなくドメイン名を使用してください。IP アドレスは予期せず変更される可能性があります。 ApsaraMQ for MQTT 技術チームは、次のシナリオにおける障害および直接的または間接的な損失について責任を負いません。
クライアントまたはバックエンドサービスアプリケーションが、ドメイン名ではなく IP アドレスを使用して ApsaraMQ for MQTT にアクセスしている。ApsaraMQ for MQTT の技術チームがドメイン名解決を更新した後、元の IP アドレスが無効になる。
ApsaraMQ for MQTT クライアントまたはバックエンドサービスアプリケーションが実行されているネットワークで、IP アドレスに対するファイアウォールポリシーが設定されている。ApsaraMQ for MQTT の技術チームがドメイン名解決を更新した後、ファイアウォールポリシーにより新しい IP アドレスがブロックされる。
クライアント SDK を使用してメッセージを送信する
Java 用のサードパーティのオープンソース SDK をダウンロードします。ダウンロードリンク: Eclipse Paho Java Client。
コード開発の参考として、クライアント SDK のデモをダウンロードします。ダウンロードリンク: mqtt-java-demo。
デモプロジェクトパッケージを特定のフォルダーに解凍します。
IntelliJ IDEA で、抽出されたファイルをインポートしてプロジェクトを作成し、pom.xml ファイルに次の依存関係が含まれているかどうかを確認します。
<dependencies> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-onsmqtt</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.0</version> </dependency> </dependencies>MQ4IoTProducerDemo.java クラスで、コードの注釈の説明に従ってパラメーターを設定します。ほとんどのパラメーターは、作成した ApsaraMQ for MQTT リソースに関連しています。詳細については、「リソースの作成」をご参照ください。 main() 関数を使用してサンプルコードを実行し、メッセージを送信します。
サンプルコード:
説明 サンプルコードを使用してメッセージを送受信する前に、環境変数を設定して ApsaraMQ for MQTT にアクセスするために使用する認証情報を取得する必要があります。環境変数の設定方法については、「アクセス認証情報を設定する」をご参照ください。ApsaraMQ for MQTT にアクセスするために使用する AccessKey ID の環境変数名は MQTT_AK_ENV で、ApsaraMQ for MQTT にアクセスするために使用する AccessKey シークレットの環境変数名は MQTT_SK_ENV です。
package com.aliyun.openservices.lmq.example.demo; import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MQ4IoTProducerDemo { public static void main(String[] args) throws Exception { // 作成した ApsaraMQ for MQTT インスタンスの ID。 String instanceId = "XXXXX"; // クライアント SDK のエンドポイント。ApsaraMQ for MQTT コンソールの [インスタンスの詳細] ページの [エンドポイント] タブでエンドポイントを取得できます。 // エンドポイントとして IP アドレスではなくドメイン名を使用してください。そうしないと、クライアント例外が発生する可能性があります。 String endPoint = "XXXXX.mqtt.aliyuncs.com"; // ID 認証のために Alibaba Cloud Resource Access Management (RAM) コンソールで作成した AccessKey ID。 // Alibaba Cloud アカウントの AccessKey ペアは、すべての API 操作に対する権限を持っています。セキュリティリスクを回避するために、RAM ユーザーを使用して API 操作を呼び出したり、日常的な O&M を実行することをお勧めします。 // AccessKey ペアをプロジェクトコードに保存しないことを強くお勧めします。そうしないと、AccessKey ペアが漏洩し、アカウントに含まれるすべてのリソースが潜在的なセキュリティリスクにさらされる可能性があります。 // この例では、AccessKey ペアは環境変数に保存されています。 String accessKey = System.getenv("MQTT_AK_ENV"); // ID 認証のために Alibaba Cloud RAM コンソールで作成した AccessKey シークレット。AccessKey シークレットは、署名認証にのみ必要です。 String secretKey = System.getenv("MQTT_SK_ENV"); // システムが ApsaraMQ for MQTT クライアントに割り当てるグローバルに一意の ID。クライアント ID は、TCP 接続ごとに異なる必要があります。複数の TCP 接続が同じクライアント ID を使用すると、例外が発生し、接続が予期せず閉じられます。 // クライアント ID は、グループ ID とデバイス ID で構成され、GroupID@@@DeviceID 形式です。グループ ID は、ApsaraMQ for MQTT コンソールで作成するグループの ID です。デバイス ID は、ユーザーが指定するカスタム ID です。クライアント ID は、64 文字を超えることはできません。 String clientId = "GID_XXXXX@@@XXXXX"; // ApsaraMQ for MQTT コンソールで作成した親トピック。 // 存在しないトピック、または ApsaraMQ for MQTT クライアントがアクセスを許可されていないトピックを指定すると、ApsaraMQ for MQTT ブローカーは接続を閉じます。 final String parentTopic = "XXXXX"; // サブトピック。ApsaraMQ for MQTT では、サブトピックを使用してメッセージをフィルタリングできます。サブトピックの名前として文字列を指定できます。 // mq4IotTopic パラメーターの値は、最大 128 文字までです。 final String mq4IotTopic = parentTopic + "/" + "testMq4Iot"; // メッセージ送信のサービス品質 (QoS) レベル。有効な値: 0、1、および 2。詳細については、「用語」を参照してください。 final int qosLevel = 0; ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId); final MemoryPersistence memoryPersistence = new MemoryPersistence(); // ApsaraMQ for MQTT クライアントが使用するプロトコルとポート。ApsaraMQ for MQTT クライアントが使用するプロトコルとポートは一致する必要があります。Secure Sockets Layer (SSL) 暗号化を使用する場合は、プロトコルとポートを ssl://endpoint:8883 として指定する必要があります。 final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence); // ApsaraMQ for MQTT クライアントが応答を待つタイムアウト期間。タイムアウト期間により、ApsaraMQ for MQTT クライアントが無期限に応答を待つことを防ぎます。 mqttClient.setTimeToWait(5000); final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); mqttClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { // クライアント接続が確立された後、できるだけ早くコンシューマーがサブスクライブする必要があるトピック。 System.out.println("connect success"); } @Override public void connectionLost(Throwable throwable) { throwable.printStackTrace(); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { // パブリッシュされたメッセージを消費するために呼び出されるコールバック。コールバックが例外をスローしないようにしてください。コールバックに対して応答が返された場合、メッセージは消費されます。 // メッセージは指定された期間内に消費される必要があります。メッセージが ApsaraMQ for MQTT ブローカーによって指定されたタイムアウト期間内に消費されない場合、ブローカーはメッセージの再送信を試みる可能性があります。メッセージ消費の冪等性を確保するために、重複除外が実行されていることを確認してください。 System.out.println( "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]); } }); mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions()); for (int i = 0; i < 10; i++) { MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes()); message.setQos(qosLevel); // メッセージが送信されるトピック。メッセージが通常のメッセージの場合、このトピックはコンシューマーがサブスクライブするトピックと同じであるか、ワイルドカードを使用して一致させることができます。 mqttClient.publish(mq4IotTopic, message); // ApsaraMQ for MQTT は、ポイントツーポイント (P2P) メッセージングをサポートしています。プロデューサーが特定のメッセージを特定のクライアント ID に送信することを知っている場合、プロデューサーは P2P メッセージングを使用できます。 // P2P メッセージングでは、コンシューマーはプロデューサーがメッセージを送信するトピックにサブスクライブする必要はありません。ロジックはコンシューマー側で簡素化されます。P2P メッセージングでは、{{parentTopic}}/p2p/{{targetClientId}} 形式でトピックを指定します。 String receiverId = "xxx"; final String p2pSendTopic = parentTopic + "/p2p/" + receiverId; message = new MqttMessage("hello mq4Iot p2p msg".getBytes()); message.setQos(qosLevel); mqttClient.publish(p2pSendTopic, message); } Thread.sleep(Long.MAX_VALUE); } }
クラウド SDK を使用してメッセージを受信する
ApsaraMQ for MQTT が提供するクラウド SDK をダウンロードします。ダウンロード URL については、「リリースノート」をご参照ください。
コード開発の参考として、クラウド SDK のデモをダウンロードします。ダウンロードリンク: mqtt-server-sdk-demo。
デモプロジェクトパッケージを特定のフォルダーに解凍します。
IntelliJ IDEA で、抽出されたファイルをインポートしてプロジェクトを作成し、pom.xml ファイルに次の依存関係が含まれているかどうかを確認します。
<dependencies> <dependency> <groupId>com.alibaba.mqtt</groupId> <artifactId>server-sdk</artifactId> <version>1.0.0.Final</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> </dependencies>MQTTConsumerDemo.java クラスで、コードの注釈の説明に従ってパラメーターを設定します。ほとんどのパラメーターは、作成した ApsaraMQ for MQTT リソースに関連しています。詳細については、「リソースの作成」をご参照ください。 main() 関数を使用してサンプルコードを実行し、メッセージを受信します。
サンプルコード:
説明 サンプルコードを使用してメッセージを送受信する前に、環境変数を設定して ApsaraMQ for MQTT にアクセスするために使用する認証情報を取得する必要があります。環境変数の設定方法については、「アクセス認証情報を設定する」をご参照ください。ApsaraMQ for MQTT にアクセスするために使用する AccessKey ID の環境変数名は MQTT_AK_ENV で、ApsaraMQ for MQTT にアクセスするために使用する AccessKey シークレットの環境変数名は MQTT_SK_ENV です。
package com.aliyun.openservices.lmq.example; import com.alibaba.fastjson.JSONObject; import com.alibaba.mqtt.server.ServerConsumer; import com.alibaba.mqtt.server.callback.MessageListener; import com.alibaba.mqtt.server.config.ChannelConfig; import com.alibaba.mqtt.server.config.ConsumerConfig; import com.alibaba.mqtt.server.model.MessageProperties; public class MQTTConsumerDemo { public static void main(String[] args) throws Exception { // クラウド SDK のエンドポイント。エンドポイントの形式については、このトピックの「エンドポイント」セクションを参照してください。 // エンドポイントとして IP アドレスではなくドメイン名を使用してください。そうしないと、ブローカー例外が発生する可能性があります。 String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com"; // クラウド SDK が使用するポート。クラウド SDK が使用するプロトコルとポートは一致する必要があります。値を 5672 に設定します。 int port = "5672"; // 作成した ApsaraMQ for MQTT インスタンスの ID。 String instanceId = "post-cn-jaj3h8i****"; // ID 認証のために Alibaba Cloud RAM コンソールで作成した AccessKey ID。 // Alibaba Cloud アカウントの AccessKey ペアは、すべての API 操作に対する権限を持っています。セキュリティリスクを回避するために、RAM ユーザーを使用して API 操作を呼び出したり、日常的な O&M を実行することをお勧めします。 // AccessKey ペアをプロジェクトコードに保存しないことを強くお勧めします。そうしないと、AccessKey ペアが漏洩し、アカウントに含まれるすべてのリソースが潜在的なセキュリティリスクにさらされる可能性があります。 // この例では、AccessKey ペアは環境変数に保存されています。 String accessKey = System.getenv("MQTT_AK_ENV"); // ID 認証のために Alibaba Cloud RAM コンソールで作成した AccessKey シークレット。AccessKey シークレットは、署名認証にのみ必要です。 String secretKey = System.getenv("MQTT_SK_ENV"); // ApsaraMQ for MQTT コンソールで作成した親トピック。 // ほとんどの場合、クラウドアプリケーションがメッセージを収集して分析するシナリオで、クラウド SDK を使用してメッセージをサブスクライブします。そのため、クラウド SDK を使用してメッセージをサブスクライブする場合は、サブトピックを指定できません。 // 存在しないトピック、または ApsaraMQ for MQTT クライアントがアクセスを許可されていないトピックを指定すると、ApsaraMQ for MQTT ブローカーは接続を閉じます。 String firstTopic = "firstTopic"; ChannelConfig channelConfig = new ChannelConfig(); channelConfig.setDomain(domain); channelConfig.setPort(port); channelConfig.setInstanceId(instanceId); channelConfig.setAccessKey(accessKey); channelConfig.setSecretKey(secretKey); ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig()); serverConsumer.start(); serverConsumer.subscribeTopic(firstTopic, new MessageListener() { @Override public void process(String msgId, MessageProperties messageProperties, byte[] payload) { System.out.println("Receive:" + msgId + "," + JSONObject.toJSONString(messageProperties) + "," + new String(payload)); } }); } }説明クラウド SDK を使用してメッセージを送信するために使用されるサンプルコードについては、「MQTTProducerDemo.java」をご参照ください。