追跡タスクと使用者グループを作成して変更追跡チャネルを設定した後、Data Transmission Service (DTS) が提供するソフトウェア開発キット (SDK) を使用して、サブスクライブしたデータをコンシュームできます。このトピックでは、サンプルコードの使用方法について説明します。
PolarDB-X 1.0 または DMS LogicDB データソースからサブスクライブしたデータをコンシュームするには、「SDK を使用して PolarDB-X 1.0 からサブスクライブしたデータをコンシュームする」をご参照ください。
このトピックでは、Java の SDK クライアントのサンプルコードを提供します。Python と Go のサンプルコードについては、「dts-subscribe-demo」をご参照ください。
前提条件
サブスクリプションインスタンスが作成され、通常状態で実行されていること。
説明サブスクリプションインスタンスの作成手順については、「サブスクリプションプランの概要」をご参照ください。
サブスクリプションインスタンスの使用者グループを作成しました。
RAM ユーザーを使用してサブスクライブデータをコンシュームする場合、その RAM ユーザーには AliyunDTSFullAccess 権限と、サブスクライブ対象オブジェクトへのアクセス権限が必要です。詳細については、「システムポリシーを使用して DTS の管理権限を RAM ユーザーに付与する」および「RAM ユーザーの権限管理」をご参照ください。
注意事項
サブスクライブデータをコンシュームする際は、`DefaultUserRecord` の commit メソッドを呼び出してオフセット情報をコミットする必要があります。そうしないと、データが繰り返しコンシュームされる可能性があります。
異なるコンシュームプロセスは互いに独立しています。
コンソールの [現在のオフセット] は、クライアントによってコミットされたオフセットではなく、追跡タスクがサブスクライブしたオフセットを示します。
操作手順
サンプル SDK コードファイルをダウンロードし、パッケージを解凍します。
SDK コードのバージョンを確認します。
サンプル SDK コードを解凍したディレクトリに移動します。
テキストエディターで、ディレクトリ内の pom.xml ファイルを開きます。
変更追跡 SDK を最新バージョンに更新します。
説明最新の Maven 依存関係は、dts-new-subscribe-sdk ページで確認できます。
SDK コードを編集します。
統合開発環境 (IDE) を使用して、解凍したファイルを開きます。
SDK クライアントを使用したいモードに対応する Java ファイルを開きます。
説明Java ファイルのパスは
aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/です。使用モード
Java ファイル
説明
シナリオ
ASSIGN モード
DTSConsumerAssignDemo.java
グローバルなメッセージ順序を保証するため、DTS は各追跡 Topic に 1 つのパーティション (パーティション 0) のみを割り当てます。SDK クライアントを ASSIGN モードで使用する場合、SDK クライアントは 1 つだけ起動してください。
1 つの使用者グループ内の 1 つの SDK クライアントのみがサブスクライブデータをコンシュームします。
SUBSCRIBE モード
DTSConsumerSubscribeDemo.java
グローバルなメッセージ順序を保証するため、DTS は各追跡 Topic に 1 つのパーティション (パーティション 0) のみを割り当てます。SDK クライアントを SUBSCRIBE モードで使用する場合、ディザスタリカバリのために同じ使用者グループ内で複数の SDK クライアントを起動できます。データをコンシュームしているクライアントに障害が発生した場合、別の SDK クライアントが自動的にランダムにパーティション 0 に割り当てられ、コンシュームを継続します。
同じ使用者グループ内の複数の SDK クライアントがサブスクライブデータをコンシュームします。これはデータディザスタリカバリのシナリオです。
Java コードでパラメーターを設定します。
パラメーター
説明
取得方法
brokerUrl変更追跡チャネルのネットワークアドレスとポート番号を指定します。
説明SDK クライアントをデプロイするサーバー (ECS インスタンスなど) と変更追跡インスタンスが同じ Virtual Private Cloud (VPC) 内にある場合、VPC 経由でデータをコンシュームしてネットワーク遅延を削減できます。
ネットワークが不安定になる可能性があるため、パブリックエンドポイントの使用は推奨されません。
DTS コンソールで、対象のサブスクリプションインスタンスの ID をクリックします。基本情報 ページで、ネットワーク セクションからネットワークアドレスとポート番号を取得します。
topic変更追跡チャネルの Topic。
DTS コンソールで、対象のサブスクリプションインスタンス ID をクリックします。基本情報 ページの 基本情報 セクションで、トピック を取得します。
sid使用者グループの ID。
DTS コンソールで、対象のサブスクリプションインスタンス ID をクリックします。データ消費 ページで、コンシューマーグループ ID /名前 と アカウント を取得します。
userName使用者グループのユーザー名。
警告このトピックで提供されているクライアントを使用しない場合は、ユーザー名を
<使用者グループのユーザー名>-<使用者グループ ID>の形式 (例:dtstest-dtsae******bpv) で設定してください。そうしないと、接続に失敗します。passwordアカウントのパスワード。
使用者グループを作成する際に、使用者グループのユーザー名に設定したパスワード。
initCheckpointコンシューマオフセット。これは、SDK クライアントがコンシュームする最初のデータレコードの UNIX タイムスタンプです (例:1620962769)。
説明コンシューマオフセットは、以下の目的で使用できます:
アプリケーションが中断された後、特定のオフセットからデータコンシュームを再開してデータ損失を防ぐ。
必要に応じてデータコンシュームの開始オフセットを調整する。
コンシューマオフセットは、追跡インスタンスのタイムスタンプ範囲内である必要があり、UNIX タイムスタンプに変換する必要があります。
説明追跡タスクリストの データ範囲 列には、対象のサブスクリプションインスタンスのタイムスタンプ範囲が表示されます。
subscribeModeSDK クライアントの使用モード。このパラメーターを変更する必要はありません。
ConsumerContext.ConsumerSubscribeMode.ASSIGN:ASSIGN モード。ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE:SUBSCRIBE モード。
N/A
IDE でプロジェクト構造を開き、プロジェクトの OpenJDK バージョンが 1.8 であることを確認します。
クライアントコードを実行します。
説明初めてコードを実行する際、IDE が必要なプラグインと依存関係を自動的にロードするために時間がかかります。
SDK クライアントは定期的にデータコンシュームに関する統計情報を収集し、表示します。これらの統計情報には、送受信されたデータレコードの総数、総データ量、1 秒あたりのレコード数 (RPS) が含まれます。
[2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}パラメーター
説明
outCountsSDK クライアントによってコンシュームされたデータレコードの総数。
outBytesSDK クライアントによってコンシュームされたデータの総量 (バイト単位)。
outRpsSDK クライアントによるデータコンシュームの 1 秒あたりのリクエスト数。
outBpsSDK クライアントによるデータコンシュームの 1 秒あたりの転送ビット数。
countデータコンシューム情報 (メトリック) のパラメーターの総数。
説明これには
count自体は含まれません。inBytesDTS サーバーから送信されたデータの総量 (バイト単位)。
DStoreRecordQueueDTS サーバーがデータを送信する際のデータキャッシュキューの現在のサイズ。
inCountsDTS サーバーから送信されたデータレコードの総数。
inBpsDTS サーバーがデータを送信する際の 1 秒あたりの転送ビット数。
inRpsDTS サーバーがデータを送信する際の 1 秒あたりのリクエスト数。
__dtSDK クライアントがデータを受信したときのタイムスタンプ (ミリ秒単位)。
DefaultUserRecordQueueシリアル化後のデータキャッシュキューのサイズ。
必要に応じて、サブスクライブしたデータをコンシュームするようにコードを編集します。
サブスクライブしたデータをコンシュームする際には、データ損失の防止、データ重複の最小化、オンデマンドでのコンシュームを可能にするために、コンシューマオフセットを管理する必要があります。
よくある質問
サブスクリプションインスタンスに接続できない場合はどうすればよいですか?
エラーメッセージに基づいて問題をトラブルシューティングします。詳細については、「トラブルシューティング」をご参照ください。
コンシューマオフセットを永続化した後のデータ形式は何ですか?
コンシューマオフセットが永続化されると、データは JSON 形式で返されます。永続化されたコンシューマオフセットは Unix タイムスタンプであり、SDK に直接渡すことができます。例えば、返されたデータでは、
"timestamp"キーの値1700709977が永続化されたコンシューマオフセットです。{"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}1 つの追跡タスクを複数のクライアントで並行してコンシュームできますか?
いいえ。SUBSCRIBE モードでは複数のクライアントを並行して実行できますが、一度にデータをコンシュームできるのは 1 つのクライアントのみです。
SDK コードにはどのバージョンの Kafka クライアントがカプセル化されていますか?
dts-new-subscribe-sdk のバージョン 2.0.0 以降では、Kafka クライアント (kafka-clients) 2.7.0 がカプセル化されています。バージョン 2.0.0 より前のバージョンでは、Kafka クライアント 1.0.0 がカプセル化されています。
説明アプリケーション開発プロセスで依存パッケージの脆弱性検出ツールを使用し、dts-new-subscribe-sdk によってカプセル化された Kafka クライアント (kafka-clients) にセキュリティ脆弱性が見つかった場合、クライアントを
2.1.4-shadedバージョンに置き換えることでこの脆弱性を解決できます。<dependency> <groupId>com.aliyun.dts</groupId> <artifactId>dts-new-subscribe-sdk</artifactId> <version>2.1.4-shaded</version> </dependency>
付録
コンシューマオフセットの管理
SDK クライアントが初めて起動、再起動、または内部でリトライする場合、データコンシュームを開始または再開するためにコンシューマオフセットをクエリして渡す必要があります。コンシューマオフセットは、SDK クライアントがコンシュームする最初のデータレコードの UNIX タイムスタンプです。
クライアントのコンシューマオフセットをリセットするには、以下の表で説明されているように、コンシュームモード (SDK 使用モード) に基づいてコンシューマオフセットをクエリおよび変更できます。
シナリオ | SDK 使用モード | オフセット管理方法 |
コンシューマオフセットのクエリ | ASSIGN モード、SUBSCRIBE モード |
|
SDK クライアントが初めて起動します。データをコンシュームするには、コンシューマオフセットを渡す必要があります。 | ASSIGN モード、SUBSCRIBE モード | SDK クライアントの使用パターンに応じて、Java ファイル DTSConsumerAssignDemo.java または DTSConsumerSubscribeDemo.java を選択し、データをコンシュームするためにコンシューマオフセットを設定 ( |
SDK クライアントが内部でリトライします。データコンシュームを再開するには、最後に記録されたコンシューマオフセットを渡す必要があります。 | ASSIGN モード | 以下の順序で最後に記録されたコンシューマオフセットを検索します。オフセットが見つかった場合、オフセット情報が返されます:
|
SUBSCRIBE モード | 以下の順序で最後に記録されたコンシューマオフセットを検索します。オフセットが見つかった場合、オフセット情報が返されます:
| |
SDK クライアントが再起動されます。データコンシュームを再開するには、最後に記録されたコンシューマオフセットを渡す必要があります。 | ASSIGN モード | consumerContext.java ファイルの
|
SUBSCRIBE モード | このモードでは、consumerContext.java ファイルの
|
コンシューマオフセットの永続的な保存
増分データ取り込みモジュールでディザスタリカバリのスイッチオーバーが発生した場合、新しいモジュールはクライアントの最後のコンシューマオフセットを保存できません。これは特に SUBSCRIBE モードで顕著です。その結果、クライアントはより早いオフセットからデータコンシュームを開始し、既存データの繰り返しコンシュームにつながる可能性があります。例えば、サービススイッチオーバー前に、古いモジュールのオフセット範囲が 2023 年 11 月 11 日 08:00:00 から 2023 年 11 月 12 日 08:00:00 で、クライアントのコンシューマオフセットが 2023 年 11 月 12 日 08:00:00 であったとします。スイッチオーバー後、新しいモジュールのオフセット範囲は 2023 年 11 月 08 日 10:00:00 から 2023 年 11 月 12 日 08:01:00 になります。このシナリオでは、クライアントは新しいモジュールの開始オフセット (2023 年 11 月 08 日 10:00:00) からコンシュームを開始するため、データの繰り返しコンシュームが発生します。
このスイッチオーバーシナリオでの既存データの繰り返しコンシュームを防ぐために、クライアント側でコンシューマオフセットの永続的な保存方法を設定することを推奨します。以下のサンプルメソッドを参考に、必要に応じて変更してください。
AbstractUserMetaStore()メソッドを継承して実装するUserMetaStore()メソッドを作成します。例えば、MySQL データベースを使用してオフセット情報を保存できます。以下のサンプル Java コードはその方法を示しています:
public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); rs = pres.executeQuery(); if (rs.next()) { String checkpoint = rs.getString("checkpoint"); return checkpoint; } } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } return null; } }consumerContext.java ファイルで、
setUserRegisteredStore(new UserMetaStore())メソッドを呼び出して外部ストレージ媒体を設定します。
トラブルシューティング
例外 | エラーメッセージ | 原因 | ソリューション |
接続失敗 | |
| 正しい |
| ブローカーアドレスを使用して実ノードの IP アドレスに接続できません。 | ||
| ユーザー名またはパスワードが正しくありません。 | ||
| consumerContext.java ファイルで、 | サブスクリプションインスタンスのタイムスタンプ範囲内のコンシューマオフセットを入力してください。クエリ方法の詳細については、「パラメーターの説明」をご参照ください。 | |
サブスクリプションコンシュームの速度低下 | N/A |
| |