すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:使用上の注意

最終更新日:Nov 09, 2025

このトピックでは、グローバルレプリケーターの使用方法、パラメーター、およびよくある質問 (FAQ) のリストについて説明します。この情報を使用して、この機能の使用を開始できます。

前提条件

次のいずれかのバージョン要件を満たす ApsaraMQ for RocketMQ インスタンスを作成する必要があります:

  • ApsaraMQ for RocketMQ V4.0 シリーズインスタンス。

    説明

    V4.0 シリーズインスタンスの Standard Edition は、一方向同期のみをサポートします。

  • ApsaraMQ for RocketMQ V5.0 シリーズインスタンス。

このトピックでは、ソースインスタンスと宛先インスタンスとして ApsaraMQ for RocketMQ 5.0 シリーズインスタンスを使用します。

コンピューティング仕様に関する推奨事項

グローバルレプリケーターを使用すると、Topic マッピングを設定した後にメッセージ同期が開始されます。このプロセスにより、インスタンスの 1 秒あたりのトランザクション数 (TPS) のオーバーヘッドが増加します。ソースと宛先の両方の ApsaraMQ for RocketMQ インスタンスのピークメッセージング TPS を考慮する必要があります。

推奨仕様:

たとえば、同期前に ApsaraMQ for RocketMQ インスタンスのピークメッセージ送信 TPS が 10,000 で、ピークメッセージ受信 TPS が 10,000 であるとします。

次の表に、ソースインスタンスと宛先インスタンスの推奨 TPS 構成を示します。

タスクタイプ

ソースインスタンス

宛先インスタンス

ピークメッセージング TPS

ピークメッセージ送信 TPS

ピークメッセージ受信 TPS

ピークメッセージング TPS

ピークメッセージ送信 TPS

ピークメッセージ受信 TPS

一方向同期

30000

10000

20000

20000

10000

10000

双方向同期

40000

20000

20000

40000

20000

20000

説明
  • インスタンスのピークメッセージング TPS は、[インスタンス詳細] ページで表示できます。さまざまなインスタンスタイプのピークメッセージング TPS の詳細については、「インスタンス仕様」をご参照ください。

  • 必要な処理速度に基づいて、宛先インスタンスのピークメッセージ受信 TPS を調整できます。

グローバルレプリケータータスクの作成

説明

初めてグローバルレプリケーターを使用すると、[Alibaba Cloud サービス承認 - RocketMQ グローバルレプリケーター] ダイアログボックスが表示されます。[OK] をクリックします。すると、システムは自動的に AliyunServiceRoleForRMQDisasterRecovery サービスリンクロールを作成します。ApsaraMQ for RocketMQ はこのロールを偽装して、グローバルレプリケーター機能を有効にできます。詳細については、「サービスリンクロール」をご参照ください。

ステップ 1: タスクの作成

  1. ApsaraMQ for RocketMQ コンソールにログオンします。
  2. 左側のナビゲーションウィンドウで、[グローバルレプリケーター] をクリックし、[タスクの作成] をクリックします。

  3. [タスクの作成] ページで、パラメーターを設定します。次の表に、主要なパラメーターを示します。必要に応じて他のパラメーターを設定できます。次に、[作成] をクリックします。

    image

    次の表に、さまざまなクラスタータイプのパラメーターを示します。

    ApsaraMQ for RocketMQ

    設定項目

    説明

    バージョン

    ApsaraMQ for RocketMQ インスタンスのバージョンを選択します。

    • 4.0 シリーズインスタンス

    • 5.0 シリーズインスタンス

    5.0 シリーズインスタンス

    リージョン

    ApsaraMQ for RocketMQ インスタンスのリージョンを選択します。

    中国 (杭州)

    インスタンス

    ApsaraMQ for RocketMQ インスタンスを選択します。

    rmq-cn-778***

    認証モード

    • 設定不要

    • ACL ベースの認証

    ACL ベースの認証

    ユーザー名

    ユーザー名を選択します。

    yS8x01****

    セキュリティグループ ID

    ApsaraMQ for RocketMQ インスタンスが配置されている ECS サービスのセキュリティグループ。このパラメーターは、[バージョン] を 5.0 シリーズインスタンスに設定した場合にのみ必要です。

    • インバウンド: 要件なし。

    • アウトバウンド:

      • 認証ポリシー: 許可

      • プロトコル: カスタム TCP

      • 宛先: CIDR ブロックには、クラスター vSwitch のすべての CIDR ブロックを含める必要があります。ポートには 8080 と 8081 を含める必要があります。

    sg-2ze4jlbqy2s40pc4****

    フィルター条件のシステム構成

    (オプション) メッセージにカスタム属性 (UserProperty) を追加します。コンシューマーは SQL92 サブスクリプションポリシーを使用してメッセージをフィルターできます。

    • [キー]: UserProperty のキー。

    • [ソースクラスター値]: ソースクラスターからのメッセージに追加される UserProperty の値。

    • [宛先クラスター値]: 宛先クラスターからのメッセージに追加される UserProperty の値。

    次のサンプルコードは、SQL92 構文に基づいてサブスクリプションをフィルターする方法を示しています:

    String sqlStr = "{Key} IS NOT NULL AND {Key} = {Source/Destination Cluster Value}";
    consumer.subscribe(MqConfig.TOPIC, 
        MessageSelector.bySql(sqlStr));

    なし

    Apache RocketMQ

    設定項目

    説明

    エンドポイント

    エンドポイントのフォーマットは {ドメイン名/IP}:{ポート} です。

    XX.XX.XX.XX:8080

    認証モード

    • 設定不要

    • ACL ベースの認証

    ACL ベースの認証

    ユーザー名

    必要な情報を入力します。

    yS8x01****

    パスワード

    必要に応じて情報を入力します。

    F17R4to****

    ネットワーク設定

    • インターネット

    • VPC

    インターネット

    リージョン

    [ネットワーク設定] を VPC に設定した場合にこのパラメーターを設定します。Apache RocketMQ クラスターが配置されている ECS サービスのリージョンを選択します。

    中国 (杭州)

    VPC ネットワーク

    [ネットワーク設定] を VPC に設定した場合にこのパラメーターを設定します。Apache RocketMQ クラスターが配置されている ECS サービスの VPC。

    vpc-bp17fapfdj0dwzjkd****

    VSwitch

    [ネットワーク設定] を VPC に設定した場合にこのパラメーターを設定します。Apache RocketMQ クラスターが配置されている ECS サービスの vSwitch。

    vsw-bp1gbjhj53hdjdkg****

    セキュリティグループ ID

    [ネットワーク設定] を VPC に設定した場合にこのパラメーターを設定します。Apache RocketMQ クラスターが配置されている ECS サービスのセキュリティグループ。

    • インバウンド: 要件なし。

    • アウトバウンド:

      • 認証ポリシー: 許可

      • プロトコル: カスタム TCP

      • 宛先: CIDR ブロックには、すべての Apache RocketMQ ノードのすべての CIDR ブロックを含める必要があります。ポートには、Apache RocketMQ ノードのすべてのサービスポートを含める必要があります。デフォルトのポートは 9876、10911、および 10909 です。ネームサーバーまたはブローカーのポートを変更した場合は、変更後のポートを指定します。

    sg-2ze4jlbqy2s40pc4****

    フィルター条件のシステム構成

    (オプション) メッセージにカスタム属性 (UserProperty) を追加します。コンシューマーは SQL92 サブスクリプションポリシーを使用してメッセージをフィルターできます。

    • キー: UserProperty のキー。

    • ソースクラスター値: ソースクラスターからのメッセージに追加される UserProperty の値。

    • 宛先クラスター値: 宛先クラスターからのメッセージに追加される UserProperty の値。

    次のサンプルコードは、SQL92 構文に基づいてサブスクリプションをフィルターする方法を示しています:

    String sqlStr = "{Key} IS NOT NULL AND {Key} = {Source/Destination Cluster Value}";
    consumer.subscribe(MqConfig.TOPIC, 
        MessageSelector.bySql(sqlStr));

    なし

ステップ 2: Topic マッピングの設定

  1. [タスク基本情報] ページで、左側のナビゲーションウィンドウにある [メッセージ同期タスク] をクリックします。

  2. [一括マッピング] または [マッピングの追加] をクリックします。表示されるダイアログボックスで、同期のための Topic マッピングを設定します。

    • 一括マッピング: 必要な Topic マッピングを設定して選択し、[OK] をクリックします。

    • マッピングの追加: 必要な Topic マッピングを設定し、[OK] をクリックします。

    説明
    • ソースクラスターが Apache RocketMQ クラスターである場合、メッセージ同期のためのグループを指定する必要があります。すでにサービス中のグループは使用しないでください。

    • メッセージは、Topic マッピングを設定した後にのみ同期されます。

ステップ 3: (オプション) コンシューマー進捗同期の有効化

コンシューマー進捗同期を有効にすると、Topic のグループの現在のコンシューマー進捗を宛先クラスターに同期できます。詳細については、「コンシューマー進捗同期」をご参照ください。

コンシューマークライアントの説明

この例では、双方向同期タスクが作成されます。ソースクラスターは上海の ApsaraMQ for RocketMQ インスタンスで、宛先クラスターは杭州の ApsaraMQ for RocketMQ インスタンスです。フィルター条件では、キーはリージョンに、ソースクラスター値は上海に、宛先クラスター値は杭州に設定されています。

通常、上海のコンシューマークライアントは上海リージョンのデータのみを消費し、杭州リージョンのデータは消費しません。したがって、メッセージを照合するには、単一のカスタム属性を指定するだけで十分です。詳細については、「イベントパターン」をご参照ください。

  • メッセージをサブスクライブし、単一のカスタム属性に基づいてメッセージを照合します。

String topic = "topic";
// ローカルリージョンのメッセージのみをサブスクライブします。
FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL", FilterExpressionType.SQL92);
// ローカルのプロデューサーアプリケーションはメッセージ送信時に UserProperty キーを Region に設定しなかったため、キー Region のカスタム属性は NULL です。
simpleConsumer.subscribe(topic, filterExpression);
  • メッセージをサブスクライブし、Topic 内のすべてのメッセージを照合し、メッセージをフィルターしません。

String topic = "topic";
// すべてのメッセージをサブスクライブします。
FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);

よくある質問

データは 3 つのインスタンス間で同期できますか?

はい、できます。単一のグローバルレプリケータータスクは 2 つのインスタンス間でデータを同期します。3 つのインスタンス間でデータを同期するには、3 つの個別のタスクを設定する必要があります。これにより、各インスタンスに完全なデータセットが含まれるようになります。

双方向同期用に設定されたインスタンスで、タグベースのフィルタリングをまだ使用できますか?

いいえ、できません。タグベースまたは SQL ベースのいずれか 1 つのフィルタリング方法しか使用できません。このシナリオのコンシューマーアプリケーションは SQL ベースのフィルタリングを必要とするため、通常はタグを使用するインスタンスであっても、すべてのインスタンスで SQL ベースのフィルタリングを使用する必要があります。詳細については、「メッセージフィルタリング」をご参照ください。

既存の一方向同期タスクを双方向同期タスクに変更できますか?

ソースクラスターと宛先クラスターの両方が ApsaraMQ for RocketMQ インスタンスである場合は、タスクを変更できます。これは、実行中の一方向タスクには影響しません。クラスターの 1 つがオープンソースの Apache RocketMQ クラスターである場合、一方向同期タスクを双方向同期タスクに変更することはできません。タスクを削除して新しいタスクを作成する必要があります。

グローバルレプリケーターはどのように課金されますか?

グローバルレプリケーターの同期チャネルは EventBridge に依存しています。EventBridge の課金の詳細については、「課金」をご参照ください。グローバルレプリケーターは、ソース ApsaraMQ for RocketMQ インスタンスの読み取りパフォーマンスと宛先インスタンスの書き込みパフォーマンスを消費します。したがって、ApsaraMQ for RocketMQ インスタンスのコンピューティング仕様を評価する必要があります。

グローバルレプリケーターに ACL ベースの認証を設定した後、同期されたメッセージが受信されないのはなぜですか?

この問題は、必要な権限が付与されていないために発生します。同期タスクを設定すると、システムはメッセージデータを同期するために Topic ごとにシステムグループを生成します。このグループは削除できず、対応するタスクが削除された場合にのみ削除されます。アクセス制御リスト (ACL) ベースの認証を有効にする場合は、ACL ユーザーに Topic への読み取りおよび書き込み権限と、システムグループからの読み取り権限を付与する必要があります。権限を付与するには、インスタンスコンソールの [リソースアクセス管理] > [ACL 権限] ページに移動し、ユーザーを選択して、[リソース名] をワイルドカード (*) に設定してアカウントに完全な権限を付与します。この方法により、設定の作業負荷を軽減できます。

グローバルレプリケータータスクが一時停止されてから再開された場合、メッセージ同期は続行できますか?

はい、できます。タスクが一時停止された後に再開されると、グローバルレプリケータータスクは、最後に確認されたオフセットの次のメッセージからデータの同期を続行します。ApsaraMQ for RocketMQ インスタンスのメッセージ保持期間を超えていないことを確認してください。