変更追跡タスクを設定した後、Data Transmission Service (DTS) が提供するSDKデモコードを使用して、追跡されたデータを使用できます。 このトピックでは、SDKデモコードを使用して、PloarDB-X 1.0やデータ管理 (DMS) LogicDBなどの分散データベースから追跡データを消費する方法について説明します。
前提条件
1.8バージョンのJava Development Kit (JDK) がインストールされています。
IntelliJ IDEAがインストールされています。
注意事項
Resource Access Management (RAM) ユーザーを使用してデータを追跡する場合、RAMユーザーはAliyunDTSFullAccess権限とソースオブジェクトにアクセスするための権限を持っている必要があります。 詳細については、「システムポリシーを使用してRAMユーザーにDTSインスタンスを管理する権限を付与する」および「RAM ユーザーへの権限付与」をご参照ください。
手順
このトピックでは、SDKデモコードを使用して、PolarDB-X 1.0インスタンスから追跡されたデータを消費する方法について説明します。 この例では、IntelliJ IDEA (Community Edition 2020.1 for Windows) が使用されています。
変更追跡インスタンスを作成します。 詳細については、「PolarDB-X 1.0インスタンスからのデータ変更の追跡」をご参照ください。
1つ以上のコンシューマグループを作成します。 詳細については、「コンシューマーグループの作成」をご参照ください。
SDKデモコードパッケージをダウンロードし、パッケージを解凍します。
IntelliJ IDEAでターゲットプロジェクトを開きます。
IntelliJ IDEAを開きます。 表示されるウィンドウで、[開く] または [インポート] をクリックします。
表示されるダイアログボックスで、パッケージが解凍されているディレクトリに移動します。 次に、フォルダを開き、pom.xmlファイルをダブルクリックします。
表示されるダイアログボックスで、[プロジェクトとして開く] を選択します。
IntelliJ IDEAウィンドウで、フォルダを展開してJavaファイルを見つけます。 次に、メソッドを使用するモードに基づいてJavaファイルをダブルクリックします。 SDKクライアントを使用します。このシナリオでは、DistributedDTSSuumerDemoを選択します。
Javaファイルのコードに必要なパラメーターを設定します。
public static void main(String[] args) がClientException { // PolarDB-X 1.0インスタンスなどの分散データベースの変更追跡タスクを設定します。 AccessKey、DTSインスタンス、DTSジョブ、およびコンシューマーグループに関連するパラメーターを指定します。 文字列accessKeyId = "LTA *********** 99reZ"; 文字列accessKeySecret = "****************"; String regionId = "cn-hangzhou"; 文字列dtsInstanceId = "dtse5212sed162 ****"; 文字列jobId = "l791216x16d ****"; 文字列sid = "dtsip412t13160 ****"; 文字列userName = "xftest"; 文字列パスワード="******"; 文字列proxyUrl = "dts-cn-****.com:18001"; // 最初のシークの初期チェックポイント (設定するタイムスタンプ、必要に応じて1566180200 (月8月19日10:03:21 CST 2019)) String checkpoint = "1639620090"; // 物理データベース /テーブル名を論理データベース /テーブル名に変換する boolean mapping = true; // 開始時に強制的に設定チェックポイントを使用する場合。 チェックポイントリセットの場合、割り当てモードのみが機能します ブールisForceUseInitCheckpoint = false; ConsumerContext.ConsumerSubscribeMode subcribeMode=ConsumerContext.ConsumerSubscribeMode.ASSIGN; DistributedDTSCusumerDemoデモ=new DistributedDTSCusumerDemo (userName、password、regionId、 jobId、sid、dtsInstanceId、accessKeyId、accessKeySecret、subcribeMode、proxyUrl、 チェックポイント、isForceUseInitCheckpoint、マッピング); demo.start(); }
パラメーター
説明
取得する方法
accessKeyId
AccessKey ID。
AccessKey IDを取得する方法の詳細については、「AccessKeyペアの作成と取得」をご参照ください。
accessKeySecret
AccessKeyシークレット。
regionId
変更追跡インスタンスが存在するリージョンのID。
新しいDTSコンソールで、インスタンスIDをクリックします。 [タスク管理] ページで、インスタンスリージョン情報を取得できます。 たとえば、インスタンスが中国 (杭州) リージョンにある場合、パラメーターを
cn-Hangzhou
に設定します。 詳細については、「サポート対象リージョンの一覧」をご参照ください。dtsInstanceId
変更追跡インスタンスのID。
新しいDTSコンソールで、インスタンスIDをクリックします。 [タスク管理] ページで、インスタンスIDとタスクIDを取得できます。
jobId
変更追跡タスクのID。
sid
コンシューマーグループの ID です。
新しいDTSコンソールで、インスタンスIDをクリックします。 左側のナビゲーションウィンドウで、[データの使用] をクリックします。 Sidと対応するアカウントを取得できます。
説明コンシューマーグループアカウントのパスワードは、コンシューマーグループを作成するときに自動的に指定されます。
userName
消費者グループのアカウント。
パスワード
コンシューマーグループのアカウントに対応するパスワード。
proxyUrl
変更追跡インスタンスのエンドポイントとポート番号。
説明内部ネットワーク上のデータ変更を追跡する場合、ネットワーク遅延は最小限に抑えられます。 これは、SDKクライアントをデプロイするECSインスタンスがクラシックネットワークまたは変更追跡インスタンスと同じ仮想プライベートクラウド (VPC) に属している場合に適用されます。
新しいDTSコンソールで、インスタンスIDをクリックします。 [タスク管理] ページで、エンドポイントとポート番号を取得できます。
チェックポイント
消費者オフセット。 SDKクライアントが最初のデータレコードを消費したときのタイムスタンプです。 値はUNIXタイムスタンプ (秒単位) です。
説明コンシューマオフセットは、次のシナリオで使用できます。
消費プロセスが中断された後、消費者オフセットを指定してデータ消費を再開できます。 これにより、データの損失を防ぐことができます。
変更追跡クライアントを起動するときに、オンデマンドでデータを消費するための消費者オフセットを指定できます。
コンシューマオフセットは、変更追跡インスタンスのデータ範囲内である必要があります。 コンシューマオフセットは、UNIXタイムスタンプに変換する必要があります。
説明検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。
IntelliJ IDEAの上部メニューバーで、 を選択してクライアントを実行します。
説明IntelliJ IDEAを初めて実行するときは、関連する依存関係を読み込んでインストールするのに時間がかかります。
実行結果は、SDKクライアントがソースインスタンスからのデータ変更を追跡できる結果を示しています。
SDKクライアントは、消費されたデータに関する統計を定期的に収集して表示します。 統計情報には、送受信されたデータレコードの総数、データの総量、および1秒あたりのリクエスト数 (RPS) が含まれます。
表 1. 次の表に、情報のパラメーターを示します。
パラメーター
説明
outCounts
SDKクライアントによって消費されたデータレコードの総数。
outBytes
SDKクライアントによって消費されたデータの合計量。 単位はバイトです。
outRps
SDKクライアントがデータを消費するときのRPSの数。
outBps
SDKクライアントがデータを消費するときに1秒あたりに送信されるビット数。
カウント
なし
inBytes
DTSサーバーによって送信されるデータの合計量。 単位はバイトです。
DStoreRecordQueue
DTSサーバーがデータを送信するときの現在のデータキャッシュキューのサイズ。
カウント
DTSサーバーによって送信されたデータレコードの総数。
inRps
DTSサーバーがデータを送信するときのRPSの数。
inBps
DTSサーバーがデータを送信するときに1秒あたりに送信されるビット数。
__dt
SDKクライアントがデータを受信したときの現在のタイムスタンプ。 単位:ミリ秒。
DefaultUserRecordQueue
シリアル化後の現在のデータキャッシュキューのサイズ。
オプション: 追跡するデータのデータ型を変更するには、
buildRecordListener()
メソッドのコードを変更するか、カスタムクラスを使用します。public static Map<String, RecordListener> buildRecordListener() { // ユーザーは自分のリスナーをimplできます RecordListener mysqlRecordPrintListener = new RecordListener() { @Override public void consume(DefaultUserRecordレコード) { OperationType operationType = record.getOperationType(); if (operationType.equals(OperationType.INSERT) | | operationType.equals(OperationType.UPDATE) | | operationType.equals(OperationType.DELETE) | | operationType.equals(OperationType.HEARTBEAT)) { // レコードを消費する RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL); recordPrintListener.consume (レコード); // チェックポイント更新をプッシュするコミットメソッド record.com mit(""); } } }; Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener) を返します。 }