すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for MQTT:Java 用 ApsaraMQ for MQTT SDK を使用して ApsaraMQ for MQTT クライアント間のメッセージングを実装する

最終更新日:Jan 14, 2025

このトピックでは、Java 用 ApsaraMQ for MQTT SDK を使用して ApsaraMQ for MQTT クライアント間でメッセージングを実装する方法について説明します。

前提条件

  • リソースを作成する
  • AccessKey ペアを取得済みであること.
  • 統合開発環境 (IDE) がインストールされていること。詳細については、IDE をご参照ください。IntelliJ IDEA または Eclipse を使用できます。この例では、IntelliJ IDEA を使用します。

  • Java Development Kit (JDK) がインストールされていること。詳細については、JDK をご参照ください。

背景情報

ApsaraMQ for MQTT は、ApsaraMQ for MQTT クライアントが相互にやり取りするシナリオに適用されます。これらのシナリオでは、プロデューサーとコンシューマーの両方が ApsaraMQ for MQTT クライアントです。各 ApsaraMQ for MQTT クライアントは、ApsaraMQ for MQTT のクライアント SDK を使用して ApsaraMQ for MQTT ブローカーに接続し、メッセージングを行います。

Messaging between clients

このトピックでは、Java 用 ApsaraMQ for MQTT SDK を使用して、インターネット経由で ApsaraMQ for MQTT クライアント間でメッセージングを実装する方法について説明します。

エンドポイント

クライアント SDK とクラウド SDK を使用して ApsaraMQ for MQTT にアクセスする前に、SDK コードで ApsaraMQ for MQTT インスタンスのエンドポイントを指定する必要があります。これにより、ApsaraMQ for MQTT クライアントとバックエンドサービスアプリケーションは、エンドポイントを使用して ApsaraMQ for MQTT ブローカーにアクセスできます。

  • クライアント SDK 用エンドポイントの形式

    クライアント SDK を使用してクライアントを ApsaraMQ for MQTT に接続する場合は、次のいずれかの形式でエンドポイントを指定します。

    • インターネットアクセスポイント: ID of the ApsaraMQ for MQTT instance.mqtt.aliyuncs.com

    • VPC アクセスポイント: ID of the ApsaraMQ for MQTT instance-internal-vpc.mqtt.aliyuncs.com

    エンドポイントインスタンスの詳細ApsaraMQ for MQTT コンソール の ページの タブで、クライアント SDK 用のエンドポイントを表示することもできます。

  • クラウド SDK 用エンドポイントの形式

    クラウド SDK を使用してバックエンドサービスアプリケーションを ApsaraMQ for MQTT に接続する場合は、次のいずれかの形式でエンドポイントを指定します。

    重要

    クラウド SDK を使用して接続できるのは、カーネルバージョンが V3.3.0 で、中国本土のリージョンにデプロイされている ApsaraMQ for MQTT インスタンスのみです。

    • インターネットアクセスポイント: ID of the ApsaraMQ for MQTT instance-server-internet.mqtt.aliyuncs.com

    • VPC アクセスポイント: ID of the ApsaraMQ for MQTT instance-server-internal.mqtt.aliyuncs.com

説明

ApsaraMQ for MQTT インスタンスの ID基本情報インスタンスの詳細ApsaraMQ for MQTT コンソール の ページの セクションで、 を取得できます。

クライアント SDK とクラウド SDK のエンドポイントは、パブリックエンドポイントまたは VPC エンドポイントにすることができます。パブリックエンドポイントは、インターネットからのアクセス用の IP アドレスであり、IoT およびモバイルインターネットのシナリオで一般的に使用されます。プライベートエンドポイントは、仮想プライベートクラウド (VPC) からのアクセス用の IP アドレスであり、バックエンドサービスアプリケーションが ApsaraMQ for MQTT にアクセスするために一般的に使用されます。

重要

エンドポイントを使用して SDK を ApsaraMQ for MQTT に接続する場合は、IP アドレスではなくドメイン名を使用してください。IP アドレスは予期なく変更される可能性があります。ApsaraMQ for MQTT 技術チームは、次のシナリオにおける障害および直接的または間接的な損失について責任を負いません。

  • クライアントまたはバックエンドサービスアプリケーションが、ドメイン名ではなく IP アドレスを使用して ApsaraMQ for MQTT にアクセスしている。ApsaraMQ for MQTT の技術チームがドメイン名解決を更新した後、元の IP アドレスが無効になる。

  • ApsaraMQ for MQTT クライアントまたはバックエンドサービスアプリケーションが実行されているネットワークで、IP アドレスに対するファイアウォールポリシーが設定されている。ApsaraMQ for MQTT の技術チームがドメイン名解決を更新した後、ファイアウォールポリシーにより新しい IP アドレスがブロックされる。

Java 用 SDK を呼び出してメッセージを送受信する

  1. サードパーティのオープンソース Java 用 SDK をダウンロードします。ダウンロードリンク: Eclipse Paho Java Client

  2. コード開発中に参照するために、Java 用 ApsaraMQ for MQTT SDK のデモをダウンロードします。ダウンロードリンク: mqtt-java-demo

  3. デモプロジェクトパッケージを特定のフォルダーに解凍します。

  4. IntelliJ IDEA で、抽出されたファイルをインポートしてプロジェクトを作成し、pom.xml ファイルに次の依存関係が含まれているかどうかを確認します。
    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
  5. MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java クラスで、コードコメントに基づいてパラメーターを設定します。これらのパラメーターのほとんどは、作成した ApsaraMQ for MQTT リソースに基づいて設定できます。詳細については、「リソースを作成する」をご参照ください。main() 関数を使用してサンプルコードを実行し、メッセージを送受信します。
    サンプルコード:
    説明 サンプルコードを使用してメッセージを送受信する前に、環境変数を構成して、ApsaraMQ for MQTT へのアクセスに使用する認証情報を取得する必要があります。環境変数の構成方法については、「アクセス認証情報を構成する」をご参照ください。

    ApsaraMQ for MQTT にアクセスするために使用する AccessKey ID の環境変数名は MQTT_AK_ENV で、ApsaraMQ for MQTT にアクセスするために使用する AccessKey シークレットの環境変数名は MQTT_SK_ENV です。

    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
        public static void main(String[] args) throws Exception {
            // ApsaraMQ for MQTT コンソールで作成した ApsaraMQ for MQTT インスタンスの ID。
            String instanceId = "XXXXX";
            // ApsaraMQ for MQTT インスタンスのエンドポイント。エンドポイントは、ApsaraMQ for MQTT コンソールの [インスタンスの詳細] ページで取得できます。
            String endPoint = "XXXXX.mqtt.aliyuncs.com";
            // ID 認証のために Alibaba Cloud RAM コンソールで作成した AccessKey ID。
            // Alibaba Cloud アカウントの AccessKey ペアを使用して、すべての API 操作にアクセスできます。セキュリティの問題を防ぐため、RAM ユーザーを使用して API 操作にアクセスし、日常の O&M を実行することをお勧めします。
            // AccessKey ペアをプロジェクトコードに保存しないことを強くお勧めします。保存すると、AccessKey ペアが漏洩し、アカウントに含まれるすべてのリソースが潜在的なセキュリティリスクにさらされる可能性があります。
            // この例では、AccessKey ペアは環境変数に保存されています。
            String accessKey = System.getenv("MQTT_AK_ENV");
            // ID 認証のために Alibaba Cloud RAM コンソールで作成した AccessKey シークレット。AccessKey シークレットは、署名認証モードを使用する場合にのみ必要です。
            String secretKey = System.getenv("MQTT_SK_ENV");
            // システムが ApsaraMQ for MQTT クライアントに割り当てるグローバルに一意の ID。クライアント ID は、TCP 接続ごとに異なる必要があります。複数の TCP 接続で同じクライアント ID を使用すると、例外が発生し、接続が予期せず閉じられます。
            // clientId パラメーターの値は、GroupID@@@DeviceID 形式です。GroupID は、ApsaraMQ for MQTT コンソールで作成したグループ ID を示し、DeviceID はデバイスのカスタム ID を示します。clientId パラメーターの値は、最大 64 文字の長さにすることができます。
            String clientId = "GID_XXXXX@@@XXXXX";
            // ApsaraMQ for MQTT コンソールで作成した親トピック。
            // 存在しないトピック、または ApsaraMQ for MQTT クライアントがアクセスを許可されていないトピックを指定すると、ApsaraMQ for MQTT ブローカーは接続を閉じます。
            final String parentTopic = "XXXXX";
            // ApsaraMQ for MQTT では、サブトピックを使用してメッセージをフィルタリングできます。文字列を値として指定できます。次のコードは例を示しています。
            // mq4IotTopic パラメーターの値は、最大 128 文字の長さにすることができます。
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            // メッセージ送信時のサービス品質 (QoS) レベル。有効な値: 0、1、および 2。詳細については、「用語」をご参照ください。
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
            // ApsaraMQ for MQTT クライアントが使用するプロトコルとポート。ApsaraMQ for MQTT クライアントが使用するプロトコルとポートは一致する必要があります。SSL 暗号化を使用する場合は、プロトコルとポートを ssl://endpoint:8883 に設定する必要があります。
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
            // ApsaraMQ for MQTT クライアントが応答を待つタイムアウト期間。タイムアウト期間を設定することで、ApsaraMQ for MQTT クライアントが無期限に応答を待つことを防ぎます。
            mqttClient.setTimeToWait(5000);
            final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    // クライアント接続が確立されたらすぐにサブスクライブする必要があるトピック。
                    System.out.println("connect success");
                    executorService.submit(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                final String topicFilter[] = {mq4IotTopic};
                                final int[] qos = {qosLevel};
                                mqttClient.subscribe(topicFilter, qos);
                            } catch (MqttException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    // メッセージを消費するために呼び出されるコールバック。コールバックが例外をスローしないようにしてください。コールバックに対して応答が返された場合、メッセージは消費されます。
                    // メッセージは指定された期間内に消費される必要があります。ApsaraMQ for MQTT ブローカーによって指定されたタイムアウト期間内にメッセージが消費されない場合、ブローカーはメッセージの再送信を試みる可能性があります。メッセージ消費の冪等性を確保するために重複排除が実行されていることを確認してください。
                    System.out.println(
                        "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                // 通常のメッセージが送信されるトピック。このトピックは、コンシューマーがサブスクライブするトピックと同じであるか、ワイルドカードを使用して一致させることができる必要があります。
                mqttClient.publish(mq4IotTopic, message);
                // ApsaraMQ for MQTT は、ポイントツーポイント (P2P) メッセージングをサポートしています。プロデューサーが特定のクライアント ID を持つコンシューマーのみがメッセージを必要としていることを確認した場合、プロデューサーは P2P メッセージをコンシューマーに送信できます。
                // P2P メッセージングでは、コンシューマーはプロデューサーがメッセージを送信するトピックをサブスクライブする必要はありません。これにより、消費ロジックが簡素化されます。P2P メッセージングでは、{{parentTopic}}/p2p/{{targetClientId}} 形式でトピックを指定します。
                final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
                message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
                message.setQos(qosLevel);
                mqttClient.publish(p2pSendTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
        }
    }

結果を確認する

プロデューサーがメッセージを送信し、コンシューマーがメッセージをサブスクライブした後、ApsaraMQ for MQTT コンソールでメッセージのトレースをクエリして、メッセージが送受信されたかどうかを確認できます。詳細については、「メッセージトレースをクエリする」をご参照ください。

関連情報

Java 用 ApsaraMQ for MQTT SDK を使用して ApsaraMQ for MQTT クライアントとバックエンドサービスアプリケーション間のメッセージングを実装する