このトピックでは、グローバルレプリケーターの使用方法、パラメーター、およびよくある質問 (FAQ) のリストについて説明します。この情報を使用して、この機能の使用を開始できます。
前提条件
次のいずれかのバージョン要件を満たす ApsaraMQ for RocketMQ インスタンスを作成する必要があります:
ApsaraMQ for RocketMQ V4.0 シリーズインスタンス。
説明V4.0 シリーズインスタンスの Standard Edition は、一方向同期のみをサポートします。
ApsaraMQ for RocketMQ V5.0 シリーズインスタンス。
このトピックでは、ソースインスタンスと宛先インスタンスとして ApsaraMQ for RocketMQ 5.0 シリーズインスタンスを使用します。
コンピューティング仕様に関する推奨事項
グローバルレプリケーターを使用すると、Topic マッピングを設定した後にメッセージ同期が開始されます。このプロセスにより、インスタンスの 1 秒あたりのトランザクション数 (TPS) のオーバーヘッドが増加します。ソースと宛先の両方の ApsaraMQ for RocketMQ インスタンスのピークメッセージング TPS を考慮する必要があります。
推奨仕様:
たとえば、同期前に ApsaraMQ for RocketMQ インスタンスのピークメッセージ送信 TPS が 10,000 で、ピークメッセージ受信 TPS が 10,000 であるとします。
次の表に、ソースインスタンスと宛先インスタンスの推奨 TPS 構成を示します。
タスクタイプ | ソースインスタンス | 宛先インスタンス | ||||
ピークメッセージング TPS | ピークメッセージ送信 TPS | ピークメッセージ受信 TPS | ピークメッセージング TPS | ピークメッセージ送信 TPS | ピークメッセージ受信 TPS | |
一方向同期 | 30000 | 10000 | 20000 | 20000 | 10000 | 10000 |
双方向同期 | 40000 | 20000 | 20000 | 40000 | 20000 | 20000 |
インスタンスのピークメッセージング TPS は、[インスタンス詳細] ページで表示できます。さまざまなインスタンスタイプのピークメッセージング TPS の詳細については、「インスタンス仕様」をご参照ください。
必要な処理速度に基づいて、宛先インスタンスのピークメッセージ受信 TPS を調整できます。
グローバルレプリケータータスクの作成
初めてグローバルレプリケーターを使用すると、[Alibaba Cloud サービス承認 - RocketMQ グローバルレプリケーター] ダイアログボックスが表示されます。[OK] をクリックします。すると、システムは自動的に AliyunServiceRoleForRMQDisasterRecovery サービスリンクロールを作成します。ApsaraMQ for RocketMQ はこのロールを偽装して、グローバルレプリケーター機能を有効にできます。詳細については、「サービスリンクロール」をご参照ください。
ステップ 1: タスクの作成
- ApsaraMQ for RocketMQ コンソールにログオンします。
左側のナビゲーションウィンドウで、[グローバルレプリケーター] をクリックし、[タスクの作成] をクリックします。
[タスクの作成] ページで、パラメーターを設定します。次の表に、主要なパラメーターを示します。必要に応じて他のパラメーターを設定できます。次に、[作成] をクリックします。

次の表に、さまざまなクラスタータイプのパラメーターを示します。
ApsaraMQ for RocketMQ
設定項目
説明
例
バージョン
ApsaraMQ for RocketMQ インスタンスのバージョンを選択します。
4.0 シリーズインスタンス
5.0 シリーズインスタンス
5.0 シリーズインスタンス
リージョン
ApsaraMQ for RocketMQ インスタンスのリージョンを選択します。
中国 (杭州)
インスタンス
ApsaraMQ for RocketMQ インスタンスを選択します。
rmq-cn-778***
認証モード
設定不要
ACL ベースの認証
ACL ベースの認証
ユーザー名
ユーザー名を選択します。
yS8x01****
セキュリティグループ ID
ApsaraMQ for RocketMQ インスタンスが配置されている ECS サービスのセキュリティグループ。このパラメーターは、[バージョン] を 5.0 シリーズインスタンスに設定した場合にのみ必要です。
インバウンド: 要件なし。
アウトバウンド:
認証ポリシー: 許可
プロトコル: カスタム TCP
宛先: CIDR ブロックには、クラスター vSwitch のすべての CIDR ブロックを含める必要があります。ポートには 8080 と 8081 を含める必要があります。
sg-2ze4jlbqy2s40pc4****
フィルター条件のシステム構成
(オプション) メッセージにカスタム属性 (UserProperty) を追加します。コンシューマーは SQL92 サブスクリプションポリシーを使用してメッセージをフィルターできます。
[キー]: UserProperty のキー。
[ソースクラスター値]: ソースクラスターからのメッセージに追加される UserProperty の値。
[宛先クラスター値]: 宛先クラスターからのメッセージに追加される UserProperty の値。
次のサンプルコードは、SQL92 構文に基づいてサブスクリプションをフィルターする方法を示しています:
String sqlStr = "{Key} IS NOT NULL AND {Key} = {Source/Destination Cluster Value}"; consumer.subscribe(MqConfig.TOPIC, MessageSelector.bySql(sqlStr));なし
Apache RocketMQ
設定項目
説明
例
エンドポイント
エンドポイントのフォーマットは
{ドメイン名/IP}:{ポート}です。XX.XX.XX.XX:8080
認証モード
設定不要
ACL ベースの認証
ACL ベースの認証
ユーザー名
必要な情報を入力します。
yS8x01****
パスワード
必要に応じて情報を入力します。
F17R4to****
ネットワーク設定
インターネット
VPC
インターネット
リージョン
[ネットワーク設定] を VPC に設定した場合にこのパラメーターを設定します。Apache RocketMQ クラスターが配置されている ECS サービスのリージョンを選択します。
中国 (杭州)
VPC ネットワーク
[ネットワーク設定] を VPC に設定した場合にこのパラメーターを設定します。Apache RocketMQ クラスターが配置されている ECS サービスの VPC。
vpc-bp17fapfdj0dwzjkd****
VSwitch
[ネットワーク設定] を VPC に設定した場合にこのパラメーターを設定します。Apache RocketMQ クラスターが配置されている ECS サービスの vSwitch。
vsw-bp1gbjhj53hdjdkg****
セキュリティグループ ID
[ネットワーク設定] を VPC に設定した場合にこのパラメーターを設定します。Apache RocketMQ クラスターが配置されている ECS サービスのセキュリティグループ。
インバウンド: 要件なし。
アウトバウンド:
認証ポリシー: 許可
プロトコル: カスタム TCP
宛先: CIDR ブロックには、すべての Apache RocketMQ ノードのすべての CIDR ブロックを含める必要があります。ポートには、Apache RocketMQ ノードのすべてのサービスポートを含める必要があります。デフォルトのポートは 9876、10911、および 10909 です。ネームサーバーまたはブローカーのポートを変更した場合は、変更後のポートを指定します。
sg-2ze4jlbqy2s40pc4****
フィルター条件のシステム構成
(オプション) メッセージにカスタム属性 (UserProperty) を追加します。コンシューマーは SQL92 サブスクリプションポリシーを使用してメッセージをフィルターできます。
キー: UserProperty のキー。
ソースクラスター値: ソースクラスターからのメッセージに追加される UserProperty の値。
宛先クラスター値: 宛先クラスターからのメッセージに追加される UserProperty の値。
次のサンプルコードは、SQL92 構文に基づいてサブスクリプションをフィルターする方法を示しています:
String sqlStr = "{Key} IS NOT NULL AND {Key} = {Source/Destination Cluster Value}"; consumer.subscribe(MqConfig.TOPIC, MessageSelector.bySql(sqlStr));なし
ステップ 2: Topic マッピングの設定
[タスク基本情報] ページで、左側のナビゲーションウィンドウにある [メッセージ同期タスク] をクリックします。
[一括マッピング] または [マッピングの追加] をクリックします。表示されるダイアログボックスで、同期のための Topic マッピングを設定します。
一括マッピング: 必要な Topic マッピングを設定して選択し、[OK] をクリックします。
マッピングの追加: 必要な Topic マッピングを設定し、[OK] をクリックします。
説明ソースクラスターが Apache RocketMQ クラスターである場合、メッセージ同期のためのグループを指定する必要があります。すでにサービス中のグループは使用しないでください。
メッセージは、Topic マッピングを設定した後にのみ同期されます。
ステップ 3: (オプション) コンシューマー進捗同期の有効化
コンシューマー進捗同期を有効にすると、Topic のグループの現在のコンシューマー進捗を宛先クラスターに同期できます。詳細については、「コンシューマー進捗同期」をご参照ください。
コンシューマークライアントの説明
この例では、双方向同期タスクが作成されます。ソースクラスターは上海の ApsaraMQ for RocketMQ インスタンスで、宛先クラスターは杭州の ApsaraMQ for RocketMQ インスタンスです。フィルター条件では、キーはリージョンに、ソースクラスター値は上海に、宛先クラスター値は杭州に設定されています。
通常、上海のコンシューマークライアントは上海リージョンのデータのみを消費し、杭州リージョンのデータは消費しません。したがって、メッセージを照合するには、単一のカスタム属性を指定するだけで十分です。詳細については、「イベントパターン」をご参照ください。
メッセージをサブスクライブし、単一のカスタム属性に基づいてメッセージを照合します。
String topic = "topic";
// ローカルリージョンのメッセージのみをサブスクライブします。
FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL", FilterExpressionType.SQL92);
// ローカルのプロデューサーアプリケーションはメッセージ送信時に UserProperty キーを Region に設定しなかったため、キー Region のカスタム属性は NULL です。
simpleConsumer.subscribe(topic, filterExpression);メッセージをサブスクライブし、Topic 内のすべてのメッセージを照合し、メッセージをフィルターしません。
String topic = "topic";
// すべてのメッセージをサブスクライブします。
FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
simpleConsumer.subscribe(topic, filterExpression);