変更追跡タスクを設定した後、Data Transmission Service (DTS) が提供するSDKデモを使用して、データを追跡および使用できます。 このトピックでは、SDKデモを使用して追跡データを消費する方法について説明します。
手順
PolarDB-X 1.0インスタンスまたはデータ管理 (DMS) 論理データベースから追跡されたデータを使用する方法については、「SDKデモコードを使用してPolarDB-X 1.0インスタンスから追跡されたデータを使用する」をご参照ください。
RAMユーザーとしてデータを追跡して使用する場合、RAMユーザーにはAliyunDTSFullAccess権限と、ソースオブジェクトにアクセスするための権限が必要です。 権限を付与する方法の詳細については、「システムポリシーを使用してRAMユーザーにDTSインスタンスを管理する権限を付与する」および「RAM ユーザーへの権限付与」をご参照ください。
消費者グループは互いに独立しています。
このトピックでは、SDKデモを使用して追跡データを消費する方法について説明します。 この例では、Windows用IntelliJ IDEA Community Edition 2020.1が使用されています。
変更追跡タスクを作成します。 詳細については、「ApsaraDB RDS For MySQLインスタンスからのデータ変更の追跡」、「PolarDB for MySQLクラスターからのデータ変更の追跡」、または「自己管理型Oracleデータベースからのデータ変更の追跡」をご参照ください。
1つ以上のコンシューマグループを作成します。 詳細については、「コンシューマーグループの作成」をご参照ください。
ビジネス要件に基づいてSDKデモを使用します。
(推奨) パッケージ化された新しい変更追跡SDKの使用
IntelliJ IDEAを開き、[新しいプロジェクトの作成] をクリックします。
作成したプロジェクトで、pom.xmlファイルを見つけます。
次の依存関係をpom.xmlファイルに追加します。
<dependency> <groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>{dts_new_sdk_version}</version> </dependency>説明変更追跡SDKの最新バージョンは、dts-new-subscribe-sdkページで確認できます。
新しい変更追跡SDKを使用します。 デモコードの詳細については、「SDKデモコード」をご参照ください。
コードをカスタマイズして新しい変更追跡SDKを使用する
SDKデモパッケージをダウンロードし、パッケージを解凍します。
説明パッケージをダウンロードするには、
/> [ZIPのダウンロード] を選択します。 パッケージが解凍されているディレクトリに移動します。 次に、テキストエディターを使用してpom.xmlファイルを開き、SDKのバージョンを最新に変更します。
重要変更追跡SDKの最新バージョンは、Maven Webサイトから入手できます。 詳細については、変更追跡SDKのMavenページをご覧ください。
IntelliJ IDEAを開きます。 表示されるウィンドウで、[開く] または [インポート] をクリックします。

表示されるダイアログボックスで、パッケージが解凍されているディレクトリに移動し、pom.xmlファイルを見つけます。 次に、[OK] をクリックします。

表示されるダイアログボックスで、[プロジェクトとして開く] を選択します。
IntelliJ IDEAで、フォルダを展開してJavaファイルを見つけます。 次に、を使用するモードに基づいてJavaファイルをダブルクリックします。 SDKクライアントを使用します。DTSCusumerAssignDemo. javaおよびDTSCusumerSubscribeDemo. java Javaファイルが利用可能です。
説明DTSは、SDKクライアントを使用する次のモードをサポートしています。
ASSIGNモード: メッセージのグローバルな順序を確保するために、DTSは追跡対象の各トピックに1つのパーティション (パーティション0) のみを割り当てます。 SDKクライアントをASSIGNモードで使用する場合は、SDKクライアントを1つだけ起動することをお勧めします。
SUBSCRIBEモード: メッセージのグローバルな順序を確保するために、DTSは追跡対象の各トピックに1つのパーティション (パーティション0) のみを割り当てます。 SUBSCRIBEモードでは、コンシューマーグループ内の複数のSDKクライアントを同時に起動して、ディザスタリカバリを実装できます。 コンシューマーグループ内のSDKクライアントが失敗した場合、他のSDKクライアントはランダムに自動的にパーティション0に割り当てられ、データ消費が再開されます。
Javaファイルのコードに必要なパラメーターを設定します。

表 1. 次の表に、必要なパラメーターを示します。
パラメーター
説明
取得する方法
brokerUrl変更追跡インスタンスのエンドポイントとポート番号。
説明内部ネットワーク上のデータ変更を追跡する場合、ネットワーク遅延は最小限に抑えられます。 これは、SDKクライアントをデプロイするElastic Compute Service (ECS) インスタンスが、クラシックネットワークまたは変更追跡インスタンスと同じ仮想プライベートクラウド (VPC) に属している場合に適用されます。
新しいDTSコンソールで、インスタンスIDをクリックします。 [基本情報] ページで、[ネットワーク] セクションでエンドポイントとポート番号を取得できます。

トピック変更追跡インスタンスのトピックの名前。
DTSコンソールで、インスタンスIDをクリックします。 [基本情報] ページで、[基本情報] セクションで追跡対象のトピックを取得できます。

sidコンシューマーグループの ID です。
DTSコンソールで、インスタンスIDをクリックします。 左側のナビゲーションウィンドウで、[データの使用] をクリックします。 コンシューマーグループのIDとアカウントを取得できます。
説明コンシューマーグループアカウントのパスワードは、コンシューマーグループの作成時に自動的に指定されます。

userName消費者グループのアカウント。
警告このトピックで説明するSDKクライアントを使用していない場合は、このパラメーターを
<Username>-<Consumer group ID>の形式で指定する必要があります。 例:dtstest-dtsae ****** bpvそうしないと、接続は失敗します。パスワードアカウントのパスワードを入力します。
initCheckpoint消費チェックポイント。 SDKクライアントが最初のデータレコードを消費したときのタイムスタンプです。 この値は UNIX タイムスタンプです。 例: 1620962769。
説明消費チェックポイントは、次のシナリオで使用できます。
消費プロセスが中断された場合、消費チェックポイントを指定してデータ消費を再開できます。 これにより、データの損失を防ぐことができます。
変更追跡クライアントを起動するときに、オンデマンドでデータを消費する消費チェックポイントを指定できます。
次の図に示すように、消費チェックポイントは変更追跡インスタンスのデータ範囲内にある必要があります。 消費チェックポイントは、UNIXタイムスタンプに変換する必要があります。
説明検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。
ConsumerContext.ConsumerSubscribeModeSDKクライアントを使用するモード。 有効な値:
ConsumerContext.ConsumerSubscribeMode.ASSIGN: ASSIGNモードでは、コンシューマグループ内の1つのSDKクライアントのみが追跡データを消費できます。ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: SUBSCRIBEモードでは、コンシューマーグループ内の複数のSDKクライアントを同時に起動して、ディザスタリカバリを実装できます。
非該当
IntelliJ IDEAの上部のナビゲーションバーで、 を選択してクライアントを実行します。
説明IntelliJ IDEAを初めて実行する場合、関連する依存関係をロードしてインストールするのに時間がかかります。
以下の図に結果を示します。 この結果は、SDKクライアントがソースデータベースからのデータ変更を追跡できることを示しています。

SDKクライアントは、消費されたデータに関する情報を定期的に計算して表示します。 この情報には、送受信されるデータレコードの総数、データの総量、および1秒あたりの要求数 (RPS) が含まれます。

表 2. 次の表に、情報のパラメーターを示します。
パラメーター
説明
outCountsSDKクライアントによって消費されたデータレコードの総数。
outBytesSDKクライアントによって消費されたデータの合計量。 単位はバイトです。
outRpsSDKクライアントがデータを消費するRPS。
outBpsSDKクライアントがデータを消費する1秒あたりの送信ビット数。
inBytesDTSサーバーによって送信されるデータの合計量。 単位はバイトです。
DStoreRecordQueueDTSサーバーがデータを送信するときの現在のデータキャッシュキューのサイズ。
カウントDTSサーバーによって送信されたデータレコードの総数。
inRpsDTSサーバーがデータを送信するRPS。
__dtSDKクライアントがデータを受信したときに生成されるタイムスタンプ。 単位:ミリ秒。
DefaultUserRecordQueueシリアル化後の現在のデータキャッシュキューのサイズ。
消費チェックポイントを保存して照会する
SDKクライアントが初めて起動または再起動された場合、または内部再試行が発生した場合は、 消費チェックポイント データ消費を開始または再開します。 次の表に、さまざまなシナリオで消費チェックポイントを管理およびクエリする方法を示します。 これにより、データの損失や重複データを防ぎ、オンデマンドでデータを消費できます。
シナリオ | SDKクライアントの使用モード | 照会方法 |
消費チェックポイントの照会 | 割り当ておよびSUBSCRIBE |
|
SDKクライアントを初めて起動するときは、データを消費するための消費チェックポイントを指定する必要があります。 | 割り当ておよびSUBSCRIBE | SDKクライアントを使用するモードに基づいて、DTConsumerAssignDemo. javaまたはDTConsumerSubscribeDemo. javaファイルを選択します。 次に、 |
内部再試行が発生した場合、データ消費を再開するには、前のデータレコードの消費チェックポイントを指定する必要があります。 | 割り当て | 次の手順を実行して、前のデータレコードの消費チェックポイントを見つけます。
|
サブスクリプション | 次の手順を実行して、前のデータレコードの消費チェックポイントを見つけます。
| |
SDKクライアントを再起動した後、データ消費を再開するには、最後のデータレコードの消費チェックポイントを指定する必要があります。 | 割り当て | consumerContext.javaファイルの
|
サブスクリプション | このモードでは、consumerContext.javaファイルの
|
トラブルシューティング
問題 | エラーメッセージ | 原因 | 解決策 |
接続に失敗しました。 | |
|
|
| ブローカーアドレスを実際のIPアドレスにリダイレクトすることはできません。 | ||
| 指定されたユーザー名またはパスワードは無効です。 | ||
| consumerContext.javaファイルの | 変更追跡インスタンスのデータ範囲内で消費チェックポイントを指定します。 詳細については、「」をご参照ください。次の表に、必要なパラメーターを示します。. | |
データ消費の応答時間が増加しました。 | 非該当 | 原因を分析するには、DStoreRecordQueueおよびDefaultUserRecordQueueパラメーターを照会します。 詳細については、「」をご参照ください。次の表に、情報のパラメーターを示します。.
| |