すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:SDK を使用したサブスクライブデータのコンシューム

最終更新日:Jan 23, 2026

追跡タスクと使用者グループを作成して変更追跡チャネルを設定した後、Data Transmission Service (DTS) が提供するソフトウェア開発キット (SDK) を使用して、サブスクライブしたデータをコンシュームできます。このトピックでは、サンプルコードの使用方法について説明します。

説明

前提条件

注意事項

  • サブスクライブデータをコンシュームする際は、`DefaultUserRecord` の commit メソッドを呼び出してオフセット情報をコミットする必要があります。そうしないと、データが繰り返しコンシュームされる可能性があります。

  • 異なるコンシュームプロセスは互いに独立しています。

  • コンソールの [現在のオフセット] は、クライアントによってコミットされたオフセットではなく、追跡タスクがサブスクライブしたオフセットを示します。

操作手順

  1. サンプル SDK コードファイルをダウンロードし、パッケージを解凍します。

  2. SDK コードのバージョンを確認します。

    1. サンプル SDK コードを解凍したディレクトリに移動します。

    2. テキストエディターで、ディレクトリ内の pom.xml ファイルを開きます。

    3. 変更追跡 SDK を最新バージョンに更新します。

      説明

      最新の Maven 依存関係は、dts-new-subscribe-sdk ページで確認できます。

      SDK バージョンパラメーターの場所 (クリックして展開)

      <name>dts-new-subscribe-sdk</name>
      <url>https://www.aliyun.com/product/dts</url>
      <description>The Aliyun new Subscribe SDK for Java used for accessing Data Transmission Service</description>
      <packaging>jar</packaging>
      <groupId>com.aliyun.dts</groupId>
      <artifactId>dts-new-subscribe-sdk</artifactId>
      <version>2.1.4</version>
  3. SDK コードを編集します。

    1. 統合開発環境 (IDE) を使用して、解凍したファイルを開きます。

    2. SDK クライアントを使用したいモードに対応する Java ファイルを開きます。

      説明

      Java ファイルのパスは aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/ です。

      使用モード

      Java ファイル

      説明

      シナリオ

      ASSIGN モード

      DTSConsumerAssignDemo.java

      グローバルなメッセージ順序を保証するため、DTS は各追跡 Topic に 1 つのパーティション (パーティション 0) のみを割り当てます。SDK クライアントを ASSIGN モードで使用する場合、SDK クライアントは 1 つだけ起動してください。

      1 つの使用者グループ内の 1 つの SDK クライアントのみがサブスクライブデータをコンシュームします。

      SUBSCRIBE モード

      DTSConsumerSubscribeDemo.java

      グローバルなメッセージ順序を保証するため、DTS は各追跡 Topic に 1 つのパーティション (パーティション 0) のみを割り当てます。SDK クライアントを SUBSCRIBE モードで使用する場合、ディザスタリカバリのために同じ使用者グループ内で複数の SDK クライアントを起動できます。データをコンシュームしているクライアントに障害が発生した場合、別の SDK クライアントが自動的にランダムにパーティション 0 に割り当てられ、コンシュームを継続します。

      同じ使用者グループ内の複数の SDK クライアントがサブスクライブデータをコンシュームします。これはデータディザスタリカバリのシナリオです。

    3. Java コードでパラメーターを設定します。

      サンプルコード

      ******        
          public static void main(String[] args) {
              // Kafka ブローカーの URL。
              String brokerUrl = "dts-cn-***.com:18001";
              // データをコンシュームする Topic。パーティションは 0 です。
              String topic = "cn_***_version2";
              // 認証用のユーザー名、パスワード、SID。
              String sid = "dts***";
              String userName = "dts***";
              String password = "DTS***";
              // 最初のシークの初期チェックポイント。これは UNIX タイムスタンプです。例えば、2019 年 8 月 19 日 (月) 10:03:21 (CST) からコンシュームを開始したい場合は、このパラメーターを 1566180200 に設定します。
              String initCheckpoint = "1740472***";
              // SUBSCRIBE モードを使用する場合、グループを設定する必要があります。Kafka コンシューマーグループが有効になります。
              ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE;
        
              DTSConsumerSubscribeDemo consumerDemo = new DTSConsumerSubscribeDemo(brokerUrl, topic, sid, userName, password, initCheckpoint, subscribeMode);
              consumerDemo.start();
          }
      ******

      パラメーター

      説明

      取得方法

      brokerUrl

      変更追跡チャネルのネットワークアドレスとポート番号を指定します。

      説明
      • SDK クライアントをデプロイするサーバー (ECS インスタンスなど) と変更追跡インスタンスが同じ Virtual Private Cloud (VPC) 内にある場合、VPC 経由でデータをコンシュームしてネットワーク遅延を削減できます。

      • ネットワークが不安定になる可能性があるため、パブリックエンドポイントの使用は推奨されません。

      DTS コンソールで、対象のサブスクリプションインスタンスの ID をクリックします。基本情報 ページで、ネットワーク セクションからネットワークアドレスとポート番号を取得します。

      topic

      変更追跡チャネルの Topic。

      DTS コンソールで、対象のサブスクリプションインスタンス ID をクリックします。基本情報 ページの 基本情報 セクションで、トピック を取得します。

      sid

      使用者グループの ID。

      DTS コンソールで、対象のサブスクリプションインスタンス ID をクリックします。データ消費 ページで、コンシューマーグループ ID /名前アカウント を取得します。

      userName

      使用者グループのユーザー名。

      警告

      このトピックで提供されているクライアントを使用しない場合は、ユーザー名を <使用者グループのユーザー名>-<使用者グループ ID> の形式 (例:dtstest-dtsae******bpv) で設定してください。そうしないと、接続に失敗します。

      password

      アカウントのパスワード。

      使用者グループを作成する際に、使用者グループのユーザー名に設定したパスワード。

      initCheckpoint

      コンシューマオフセット。これは、SDK クライアントがコンシュームする最初のデータレコードの UNIX タイムスタンプです (例:1620962769)。

      説明

      コンシューマオフセットは、以下の目的で使用できます:

      • アプリケーションが中断された後、特定のオフセットからデータコンシュームを再開してデータ損失を防ぐ。

      • 必要に応じてデータコンシュームの開始オフセットを調整する。

      コンシューマオフセットは、追跡インスタンスのタイムスタンプ範囲内である必要があり、UNIX タイムスタンプに変換する必要があります。

      説明

      追跡タスクリストの データ範囲 列には、対象のサブスクリプションインスタンスのタイムスタンプ範囲が表示されます。

      subscribeMode

      SDK クライアントの使用モード。このパラメーターを変更する必要はありません。

      • ConsumerContext.ConsumerSubscribeMode.ASSIGN:ASSIGN モード。

      • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE:SUBSCRIBE モード。

      N/A

  4. IDE でプロジェクト構造を開き、プロジェクトの OpenJDK バージョンが 1.8 であることを確認します。

  5. クライアントコードを実行します。

    説明

    初めてコードを実行する際、IDE が必要なプラグインと依存関係を自動的にロードするために時間がかかります。

    結果の例 (クリックして展開)

    正常な実行結果

    以下の結果が返された場合、クライアントは正常に実行されており、ソースデータベースからのデータ変更をサブスクライブする準備ができています。

    ******
    [2025-02-25 18:47:22.991] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.consumer.KafkaConsumer:1587] - [Consumer clientId=consumer-dtsl5vy2ao5250****-1, groupId=dtsl5vy2ao5250****] Seeking to offset 8200 for partition cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0
    [2025-02-25 18:47:22.993] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap:116] - RecordFetcher consumer:  subscribe for [cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0] with checkpoint [Checkpoint[ topicPartition: cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0timestamp: 174048****, offset: 8200, info: 174048****]] start
    [2025-02-25 18:47:23.011] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174048044****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8200]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8201]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    ******

    正常なサブスクリプション結果

    以下の結果が返された場合、クライアントはソースデータベースからのデータ変更 (UPDATE 操作) を正常にサブスクライブしています。

    ******
    [2025-02-25 18:48:24.905] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8413]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [UPDATE]
    Schema info [{, 
    recordFields= [{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}, {fieldName='name', rawDataTypeNum=253, isPrimaryKey=false, isUniqueKey=false, fieldPosition=1}], 
    databaseName='dtsdb', 
    tableName='person', 
    primaryIndexInfo [[indexType=PrimaryKey, indexFields=[{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}], cardinality=0, nullable=true, isFirstUniqueIndex=false, name=null]], 
    uniqueIndexInfo [[]], 
    partitionFields = null}]
    Before image {[Field [id] [3]
    Field [name] [test1]
    ]}
    After image {[Field [id] [3]
    Field [name] [test2]
    ]}
    ******

    異常な実行結果

    以下の結果が返された場合、クライアントはソースデータベースに接続できません。

    ******
    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:22.002] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****-1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:22.509] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    [2025-02-25 18:22:23.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":1740478943160,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:27.192] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:27.618] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    ******

    SDK クライアントは定期的にデータコンシュームに関する統計情報を収集し、表示します。これらの統計情報には、送受信されたデータレコードの総数、総データ量、1 秒あたりのレコード数 (RPS) が含まれます。

    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}

    パラメーター

    説明

    outCounts

    SDK クライアントによってコンシュームされたデータレコードの総数。

    outBytes

    SDK クライアントによってコンシュームされたデータの総量 (バイト単位)。

    outRps

    SDK クライアントによるデータコンシュームの 1 秒あたりのリクエスト数。

    outBps

    SDK クライアントによるデータコンシュームの 1 秒あたりの転送ビット数。

    count

    データコンシューム情報 (メトリック) のパラメーターの総数。

    説明

    これには count 自体は含まれません。

    inBytes

    DTS サーバーから送信されたデータの総量 (バイト単位)。

    DStoreRecordQueue

    DTS サーバーがデータを送信する際のデータキャッシュキューの現在のサイズ。

    inCounts

    DTS サーバーから送信されたデータレコードの総数。

    inBps

    DTS サーバーがデータを送信する際の 1 秒あたりの転送ビット数。

    inRps

    DTS サーバーがデータを送信する際の 1 秒あたりのリクエスト数。

    __dt

    SDK クライアントがデータを受信したときのタイムスタンプ (ミリ秒単位)。

    DefaultUserRecordQueue

    シリアル化後のデータキャッシュキューのサイズ。

  6. 必要に応じて、サブスクライブしたデータをコンシュームするようにコードを編集します。

    サブスクライブしたデータをコンシュームする際には、データ損失の防止、データ重複の最小化、オンデマンドでのコンシュームを可能にするために、コンシューマオフセットを管理する必要があります。

よくある質問

  • サブスクリプションインスタンスに接続できない場合はどうすればよいですか?

    エラーメッセージに基づいて問題をトラブルシューティングします。詳細については、「トラブルシューティング」をご参照ください。

  • コンシューマオフセットを永続化した後のデータ形式は何ですか?

    コンシューマオフセットが永続化されると、データは JSON 形式で返されます。永続化されたコンシューマオフセットは Unix タイムスタンプであり、SDK に直接渡すことができます。例えば、返されたデータでは、"timestamp" キーの値 1700709977 が永続化されたコンシューマオフセットです。

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}
  • 1 つの追跡タスクを複数のクライアントで並行してコンシュームできますか?

    いいえ。SUBSCRIBE モードでは複数のクライアントを並行して実行できますが、一度にデータをコンシュームできるのは 1 つのクライアントのみです。

  • SDK コードにはどのバージョンの Kafka クライアントがカプセル化されていますか?

    dts-new-subscribe-sdk のバージョン 2.0.0 以降では、Kafka クライアント (kafka-clients) 2.7.0 がカプセル化されています。バージョン 2.0.0 より前のバージョンでは、Kafka クライアント 1.0.0 がカプセル化されています。

    説明

    アプリケーション開発プロセスで依存パッケージの脆弱性検出ツールを使用し、dts-new-subscribe-sdk によってカプセル化された Kafka クライアント (kafka-clients) にセキュリティ脆弱性が見つかった場合、クライアントを 2.1.4-shaded バージョンに置き換えることでこの脆弱性を解決できます。

    <dependency>
        <groupId>com.aliyun.dts</groupId>
        <artifactId>dts-new-subscribe-sdk</artifactId>
        <version>2.1.4-shaded</version>
    </dependency>

付録

コンシューマオフセットの管理

SDK クライアントが初めて起動、再起動、または内部でリトライする場合、データコンシュームを開始または再開するためにコンシューマオフセットをクエリして渡す必要があります。コンシューマオフセットは、SDK クライアントがコンシュームする最初のデータレコードの UNIX タイムスタンプです。

クライアントのコンシューマオフセットをリセットするには、以下の表で説明されているように、コンシュームモード (SDK 使用モード) に基づいてコンシューマオフセットをクエリおよび変更できます。

シナリオ

SDK 使用モード

オフセット管理方法

コンシューマオフセットのクエリ

ASSIGN モード、SUBSCRIBE モード

  • SDK クライアントは 5 秒ごとにメッセージオフセットを保存し、DTS サーバーにコミットするため、以下のいずれかの場所から最後のコンシューマオフセットをクエリできます:

    • SDK クライアントが配置されているサーバー上の localCheckpointStore ファイル。

    • 変更追跡チャンネルの[データ消費]ページ

  • consumerContext.java ファイルで setUserRegisteredStore(new UserMetaStore()) を使用して、データベースなどの外部の永続的な共有ストレージ媒体を設定した場合、このストレージ媒体はクエリ用に 5 秒ごとにメッセージオフセットを保存します。

SDK クライアントが初めて起動します。データをコンシュームするには、コンシューマオフセットを渡す必要があります。

ASSIGN モード、SUBSCRIBE モード

SDK クライアントの使用パターンに応じて、Java ファイル DTSConsumerAssignDemo.java または DTSConsumerSubscribeDemo.java を選択し、データをコンシュームするためにコンシューマオフセットを設定 (initCheckpoint) します。

SDK クライアントが内部でリトライします。データコンシュームを再開するには、最後に記録されたコンシューマオフセットを渡す必要があります。

ASSIGN モード

以下の順序で最後に記録されたコンシューマオフセットを検索します。オフセットが見つかった場合、オフセット情報が返されます:

  1. consumerContext.java ファイルで setUserRegisteredStore(new UserMetaStore()) を使用して設定した外部ストレージ媒体。

  2. SDK クライアントが配置されているサーバー上の localCheckpointStore ファイル。

  3. DTSConsumerSubscribeDemo.java ファイルで initCheckpoint に渡す開始タイムスタンプ。

SUBSCRIBE モード

以下の順序で最後に記録されたコンシューマオフセットを検索します。オフセットが見つかった場合、オフセット情報が返されます:

  1. consumerContext.java ファイルで setUserRegisteredStore(new UserMetaStore()) を使用して設定した外部ストレージ媒体。

  2. DTS サーバー (増分データ取り込みモジュール) に保存されているオフセット。

    説明

    このオフセットは、SDK クライアントが commit メソッドを呼び出してコンシューマオフセットを更新した後にのみ更新されます。

  3. DTSConsumerSubscribeDemo.java ファイルで initCheckpoint に渡す開始タイムスタンプ。

  4. DTS サーバー (新しい増分データ取り込みモジュール) の開始オフセット。

    重要

    増分データ取り込みモジュールが切り替わった場合、新しいモジュールはクライアントの最後のコンシューマオフセットを保存できません。これにより、データコンシュームが古いオフセットから開始される可能性があります。クライアント側でコンシューマオフセットを永続的に保存することを推奨します。

SDK クライアントが再起動されます。データコンシュームを再開するには、最後に記録されたコンシューマオフセットを渡す必要があります。

ASSIGN モード

consumerContext.java ファイルの setForceUseCheckpoint 設定に基づいてコンシューマオフセットがクエリされ、見つかった場合はオフセット情報が返されます:

  • true に設定すると、SDK クライアントは再起動するたびに渡された initCheckpoint をコンシューマオフセットとして使用します。

  • false に設定するか、設定しない場合、以下の順序で前のレコードのコンシューマオフセットを検索します:

    1. SDK クライアントが配置されているサーバー上の localCheckpointStore ファイル。

    2. DTS サーバー (増分データ取り込みモジュール) に保存されているオフセット。

      説明

      このオフセットは、SDK クライアントが commit メソッドを呼び出してコンシューマオフセットを更新した後にのみ更新されます。

    3. consumerContext.java ファイルで setUserRegisteredStore(new UserMetaStore()) を使用して設定した外部ストレージ媒体。

SUBSCRIBE モード

このモードでは、consumerContext.java ファイルの setForceUseCheckpoint 設定は有効になりません。以下の順序で前のレコードのコンシューマオフセットを検索します:

  1. consumerContext.java ファイルで setUserRegisteredStore(new UserMetaStore()) を使用して設定した外部ストレージ媒体。

  2. DTS サーバー (増分データ取り込みモジュール) に保存されているオフセット。

    説明

    このオフセットは、SDK クライアントが commit メソッドを呼び出してコンシューマオフセットを更新した後にのみ更新されます。

  3. DTSConsumerSubscribeDemo.java ファイルで initCheckpoint に渡す開始タイムスタンプ。

  4. DTS サーバー (新しい増分データ取り込みモジュール) の開始オフセット。

コンシューマオフセットの永続的な保存

増分データ取り込みモジュールでディザスタリカバリのスイッチオーバーが発生した場合、新しいモジュールはクライアントの最後のコンシューマオフセットを保存できません。これは特に SUBSCRIBE モードで顕著です。その結果、クライアントはより早いオフセットからデータコンシュームを開始し、既存データの繰り返しコンシュームにつながる可能性があります。例えば、サービススイッチオーバー前に、古いモジュールのオフセット範囲が 2023 年 11 月 11 日 08:00:00 から 2023 年 11 月 12 日 08:00:00 で、クライアントのコンシューマオフセットが 2023 年 11 月 12 日 08:00:00 であったとします。スイッチオーバー後、新しいモジュールのオフセット範囲は 2023 年 11 月 08 日 10:00:00 から 2023 年 11 月 12 日 08:01:00 になります。このシナリオでは、クライアントは新しいモジュールの開始オフセット (2023 年 11 月 08 日 10:00:00) からコンシュームを開始するため、データの繰り返しコンシュームが発生します。

このスイッチオーバーシナリオでの既存データの繰り返しコンシュームを防ぐために、クライアント側でコンシューマオフセットの永続的な保存方法を設定することを推奨します。以下のサンプルメソッドを参考に、必要に応じて変更してください。

  1. AbstractUserMetaStore() メソッドを継承して実装する UserMetaStore() メソッドを作成します。

    例えば、MySQL データベースを使用してオフセット情報を保存できます。以下のサンプル Java コードはその方法を示しています:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
            String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
            String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                rs = pres.executeQuery();
                              
                if (rs.next()) {
                    String checkpoint = rs.getString("checkpoint");
                    return checkpoint;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
            return null;
        }
    }
    
  2. consumerContext.java ファイルで、setUserRegisteredStore(new UserMetaStore()) メソッドを呼び出して外部ストレージ媒体を設定します。

トラブルシューティング

例外

エラーメッセージ

原因

ソリューション

接続失敗

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

brokerUrl が正しくありません。

正しい brokerUrluserNamepassword を入力してください。詳細については、「パラメーターの説明」をご参照ください。

telnet real node *** failed, please check the network

ブローカーアドレスを使用して実ノードの IP アドレスに接続できません。

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

ユーザー名またはパスワードが正しくありません。

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

consumerContext.java ファイルで、setUseCheckpointtrue に設定されていますが、コンシューマオフセットがサブスクリプションインスタンスのタイムスタンプ範囲内にありません。

サブスクリプションインスタンスのタイムスタンプ範囲内のコンシューマオフセットを入力してください。クエリ方法の詳細については、「パラメーターの説明」をご参照ください。

サブスクリプションコンシュームの速度低下

N/A

  • DStoreRecordQueue および DefaultUserRecordQueue キューのサイズに関する統計情報のパラメーターをクエリして、データコンシュームが遅くなった原因を分析します。

    • DStoreRecordQueue の値が 0 のままである場合、DTS サーバーがデータをプルする速度が遅いです。

    • DefaultUserRecordQueue の値がデフォルト値の 512 のままである場合、SDK クライアントがデータをコンシュームする速度が遅いです。

  • 必要に応じて、コード内のコンシューマオフセット (initCheckpoint) を変更してオフセットをリセットします。