変更追跡チャネルの追跡タスクと使用者グループを作成した後、Data Transmission Service (DTS) SDK を使用して追跡データをコンシュームできます。このトピックでは、提供されているサンプルコードの使用方法について説明します。
PolarDB-X 1.0 インスタンスまたは Data Management (DMS) 論理データベースから追跡されたデータをコンシュームする方法については、「SDK デモを使用して PolarDB-X 1.0 インスタンスから追跡されたデータをコンシュームする」をご参照ください。
このトピックでは、Java SDK クライアントを例として使用します。Python および Go のサンプルコードについては、「dts-subscribe-demo」をご参照ください。
前提条件
ステータスが 通常 のサブスクリプションインスタンスを作成済みであること。
説明サブスクリプションインスタンスの作成方法の詳細については、「サブスクリプションシナリオの概要」をご参照ください。
変更追跡インスタンスの使用者グループが作成されていること。詳細については、「使用者グループの作成」をご参照ください。
サブスクライブしたデータをコンシュームするには、RAM ユーザーに [AliyunDTSFullAccess] 権限とソースオブジェクトにアクセスするための権限が必要です。これらの権限を付与する方法の詳細については、「システムポリシーを使用して RAM ユーザーに DTS インスタンスの管理を承認する」および「RAM ユーザーへの権限付与」をご参照ください。
注意事項
追跡データをコンシュームする際、DefaultUserRecord の commit メソッドを呼び出してオフセットをコミットする必要があります。そうしないと、データが繰り返しコンシュームされます。
異なるコンシュームプロセスは互いに独立しています。
コンソールの [現在のオフセット] は、クライアントによって送信されたオフセットではなく、追跡タスクの現在のオフセットを示します。
手順
SDK サンプルコードファイルをダウンロードし、ファイルを解凍します。
SDK コードのバージョンを確認します。
ファイルを解凍したディレクトリに移動します。
テキストエディターを使用して、ディレクトリ内の pom.xml ファイルを開きます。
変更追跡 SDK のバージョン (version) を最新バージョンに変更します。
説明最新の Maven 依存関係は、dts-new-subscribe-sdk ページで確認できます。
SDK コードを編集します。
コードエディターを使用して、解凍したファイルを開きます。
SDK クライアントのコンシュームモードに対応する Java ファイルを開きます。
説明Java ファイルのパスは
aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/です。使用モード
Java ファイル
説明
シナリオ
ASSIGN モード
DTSConsumerAssignDemo.java
メッセージのグローバルな順序を保証するため、DTS は各追跡 Topic に 1 つのパーティション (パーティション 0) のみを割り当てます。SDK クライアントを ASSIGN モードで使用する場合、SDK クライアントを 1 つだけ起動することをお勧めします。
同じ使用者グループ内で 1 つの SDK クライアントのみが追跡データをコンシュームします。
SUBSCRIBE モード
DTSConsumerSubscribeDemo.java
メッセージのグローバルな順序を保証するため、DTS は各追跡 Topic に 1 つのパーティション (パーティション 0) のみを割り当てます。SDK クライアントを SUBSCRIBE モードで使用する場合、1 つの使用者グループで複数の SDK クライアントを同時に起動してディザスタリカバリを実装できます。これは、使用者グループでデータをコンシュームしているクライアントに障害が発生した場合、他の SDK クライアントがランダムかつ自動的にパーティション 0 に割り当てられ、コンシュームを継続するためです。
同じ使用者グループ内で複数の SDK クライアントが追跡データをコンシュームします。これは、データディザスタリカバリシナリオに適用できます。
Java コードでパラメーターを設定します。
パラメーター
説明
取得方法
brokerUrl変更追跡チャネルのネットワークアドレスとポート番号。
説明SDK クライアントをデプロイするサーバー (ECS インスタンスなど) と変更追跡インスタンスが同じ VPC にある場合は、データコンシュームに VPC アドレスを使用してネットワーク遅延を削減することをお勧めします。
ネットワークが不安定になる可能性があるため、パブリックエンドポイントの使用は推奨されません。
DTS コンソールで、対象のサブスクリプションインスタンス ID をクリックします。これにより 基本情報 ページが開き、ネットワーク セクションからエンドポイントとポート番号を取得できます。
topic変更追跡チャネルの追跡 Topic。
DTS コンソールで、対象のサブスクリプションインスタンス ID をクリックします。基本情報 ページで、基本情報 セクションの トピック を見つけます。
sid使用者グループ ID。
DTS コンソールで、対象のサブスクリプションインスタンス ID をクリックします。データ消費 ページで、使用者グループの コンシューマーグループ ID /名前 と アカウント を取得します。
userName使用者グループアカウントのユーザー名。
警告このトピックで提供されているクライアントを使用していない場合は、ユーザー名を
<使用者グループのユーザー名>-<使用者グループ ID>形式に設定してください。例:dtstest-dtsae******bpv。そうしないと、接続は失敗します。passwordアカウントのパスワード。
使用者グループを作成したときに設定した使用者グループアカウントのパスワード。
initCheckpointコンシューマオフセット。このパラメーターは、SDK クライアントがコンシュームする最初のデータレコードのタイムスタンプを指定します。値は UNIX タイムスタンプ (例: 1620962769) である必要があります。
説明コンシューマオフセットは、次の目的で使用できます:
アプリケーションの中断後、最後にコンシュームされたオフセットからデータコンシュームを続行します。これにより、データの損失を防ぎます。
クライアントの起動時に特定のオフセットからデータコンシュームを開始します。これにより、特定の時点からデータをコンシュームできます。
コンシューマオフセットは、変更追跡インスタンスのタイムスタンプ範囲内である必要があり、UNIX タイムスタンプに変換する必要があります。
説明対象の変更追跡インスタンスのデータ範囲は、追跡タスクリストの データ範囲 列で確認できます。
subscribeModeSDK クライアントのコンシュームモード。このパラメーターを変更する必要はありません。
ConsumerContext.ConsumerSubscribeMode.ASSIGN: ASSIGN モード。ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE: SUBSCRIBE モード。
なし
コードエディターでプロジェクト構造を開き、このプロジェクトの OpenJDK バージョンが 1.8 であることを確認します。
クライアントコードを実行します。
説明初めてコードを実行するとき、コードエディターが必要なプラグインと依存関係を自動的にロードするのに時間がかかる場合があります。
SDK クライアントは、データコンシュームに関する統計情報を定期的に収集して表示します。この情報には、送受信されたデータレコードの総数、総データ量、および受信した 1 秒あたりのレコード数 (RPS) が含まれます。
[2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}パラメーター
説明
outCountsSDK クライアントによってコンシュームされたデータレコードの総数。
outBytesSDK クライアントによってコンシュームされたデータの総量 (バイト単位)。
outRpsSDK クライアントがデータをコンシュームするときの RPS。
outBpsSDK クライアントがデータをコンシュームするときの 1 秒あたりの伝送ビット数。
countSDK クライアントのデータコンシューム情報 (メトリック) 内のパラメーターの総数。
説明これには
count自体は含まれません。inBytesDTS サーバーから送信されたデータの総量 (バイト単位)。
DStoreRecordQueueDTS サーバーがデータを送信するときのデータキャッシュキューの現在のサイズ。
inCountsDTS サーバーから送信されたデータレコードの総数。
inBpsDTS サーバーがデータを送信するときの 1 秒あたりの伝送ビット数。
inRpsDTS サーバーがデータを送信するときの RPS。
__dtSDK クライアントがデータを受信したときの現在のタイムスタンプ (ミリ秒単位)。
DefaultUserRecordQueueシリアル化後のデータキャッシュキューの現在のサイズ。
必要に応じてコードを編集して、追跡データをコンシュームします。
サブスクライブしたデータをコンシュームする際、コンシューマオフセットを管理する必要があります。この方法により、データの損失や重複を防ぎ、オンデマンドでのコンシュームが可能になります。
よくある質問
変更追跡インスタンスに接続できない場合はどうすればよいですか?
エラーメッセージに基づいて問題をトラブルシューティングします。詳細については、「トラブルシューティング」をご参照ください。
永続化後のコンシューマオフセットのデータ形式は何ですか?
コンシューマオフセットが永続化されると、JSON 形式で返されます。永続化されたコンシューマオフセットは UNIX タイムスタンプであり、SDK に直接渡すことができます。次の例では、
"timestamp"の後の1700709977が永続化されたコンシューマオフセットです。{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}追跡タスクは複数のクライアントによる並列コンシュームをサポートしていますか?
いいえ。SUBSCRIBE モードでは複数のクライアントを並列で実行できますが、常に 1 つのクライアントのみがアクティブにデータをコンシュームできます。
SDK コードにはどのバージョンの Kafka クライアントがカプセル化されていますか?
dts-new-subscribe-sdk のバージョン 2.0.0 以降は、Kafka クライアント (kafka-clients) のバージョン 2.7.0 をカプセル化しています。2.0.0 より前のバージョンは、Kafka クライアントのバージョン 1.0.0 をカプセル化しています。
付録
コンシューマオフセットの管理
SDK クライアントが初めて起動、再起動、または内部リトライを実行する場合、コンシューマオフセットをクエリして渡し、データコンシュームを開始または再開する必要があります。コンシューマオフセットは、SDK クライアントがコンシュームする最初のデータレコードの UNIX タイムスタンプです。
クライアントのコンシューマオフセットをリセットするには、次の表に示すように、コンシュームモード (SDK 使用モード) に基づいてコンシューマオフセットをクエリおよび変更できます。
シナリオ | SDK 使用モード | オフセット管理方法 |
コンシューマオフセットのクエリ | ASSIGN モード、SUBSCRIBE モード |
|
SDK クライアントを初めて起動するときに、コンシューマオフセットを渡してデータをコンシュームする。 | ASSIGN モード、SUBSCRIBE モード | SDK クライアントの使用モードに基づいて、DTSConsumerAssignDemo.java または DTSConsumerSubscribeDemo.java ファイルを選択します。次に、コンシューマオフセットを設定 ( |
SDK クライアントは、内部リトライのためにデータコンシュームを継続するために、最後に記録されたコンシューマオフセットを再 する必要があります。 | ASSIGN モード | 次の順序で最後に記録されたコンシューマオフセットを検索します。見つかると、オフセット情報が返されます:
|
SUBSCRIBE モード | 次の順序で最後に記録されたコンシューマオフセットを検索します。見つかると、オフセット情報が返されます:
| |
SDK クライアントを再起動した後、最後に記録されたコンシューマオフセットを再 してデータコンシュームを続行します。 | ASSIGN モード | consumerContext.java ファイルの
|
SUBSCRIBE モード | このモードでは、consumerContext.java ファイルの
|
コンシューマオフセットの永続ストレージの使用
増分データ収集モジュールでディザスタリカバリメカニズムがトリガーされた場合、特に SUBSCRIBE モードでは、新しいモジュールはクライアントの最後のコンシューマオフセットを取得できません。これにより、クライアントが古いオフセットからデータのコンシュームを開始し、既存データの繰り返しコンシュームが発生する可能性があります。たとえば、サービススイッチオーバーの前に、古いモジュールのオフセット範囲が 2023 年 11 月 11 日 08:00:00 から 2023 年 11 月 12 日 08:00:00 までであるとします。クライアントのコンシューマオフセットはこの範囲の最後にあります: 2023 年 11 月 12 日 08:00:00。スイッチオーバー後、新しいモジュールのオフセット範囲は 2023 年 11 月 8 日 10:00:00 から 2023 年 11 月 12 日 08:01:00 までです。このシナリオでは、クライアントは新しいモジュールの開始オフセット (2023 年 11 月 8 日 10:00:00) からデータのコンシュームを開始するため、既存データが再度コンシュームされます。
このスイッチオーバーシナリオで既存データの繰り返しコンシュームを避けるために、クライアントでコンシューマオフセットの永続ストレージ方法を設定することをお勧めします。次のセクションでは、要件に基づいて変更できるサンプルメソッドを提供します。
AbstractUserMetaStore()メソッドを継承して実装するUserMetaStore()メソッドを作成します。たとえば、MySQL データベースを使用してオフセット情報を保存する場合、Java サンプルコードは次のようになります:
public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); ResultSet rs = pres.executeQuery() String checkpoint = rs.getString("checkpoint"); return checkpoint; } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } }consumerContext.java ファイルで、
setUserRegisteredStore(new UserMetaStore())メソッドを使用して外部ストレージ媒体を設定します。
トラブルシューティング
エラー | エラーメッセージ | 原因 | 解決策 |
接続に失敗しました | |
|
|
| ブローカーアドレスを介して実 IP アドレスに接続できません。 | ||
| ユーザー名またはパスワードが正しくありません。 | ||
| consumerContext.java ファイルで、 | 変更追跡インスタンスのデータ範囲内のコンシューマオフセットを指定します。詳細については、このトピックの「パラメーターの説明」表をご参照ください。 | |
データコンシュームが遅くなる | なし |
| |